Skip to content

Commit 101f0b3

Browse files
sanketkediagithub-actions[bot]
authored andcommitted
[ENH]: Return database id in get collections call from sysdb (#4686)
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - Organizing data of collections in s3 by s3 prefixes requires knowledge about database id in the write path. This PR populates and returns database id from sysdb in get_collections rpc - We skip serializing and returning this from FE to client - Even though the collection model is the same for both local and distributed, we don't set it in local currently. However, it can be done if needed - New functionality - ... ## Test plan _How are these changes tested?_ - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None
1 parent 08c6526 commit 101f0b3

File tree

8 files changed

+84
-84
lines changed

8 files changed

+84
-84
lines changed

go/pkg/sysdb/grpc/proto_model_convert.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func convertCollectionToProto(collection *model.Collection) *coordinatorpb.Colle
4242
return nil
4343
}
4444

45+
dbId := collection.DatabaseId.String()
4546
collectionpb := &coordinatorpb.Collection{
4647
Id: collection.ID.String(),
4748
Name: collection.Name,
@@ -60,6 +61,7 @@ func convertCollectionToProto(collection *model.Collection) *coordinatorpb.Colle
6061
Seconds: collection.UpdatedAt,
6162
Nanos: 0,
6263
},
64+
DatabaseId: &dbId,
6365
}
6466

6567
if collection.RootCollectionID != nil {

idl/chromadb/proto/chroma.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ message Collection {
6464
optional string root_collection_id = 14;
6565
optional string lineage_file_path = 15;
6666
google.protobuf.Timestamp updated_at = 16;
67+
// This is the database id of the collection.
68+
optional string database_id = 17;
6769
}
6870

6971
message Database {

rust/garbage_collector/src/helper.rs

Lines changed: 2 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@ use chroma_types::chroma_proto::log_service_client::LogServiceClient;
22
use chroma_types::chroma_proto::query_executor_client::QueryExecutorClient;
33
use chroma_types::chroma_proto::sys_db_client::SysDbClient;
44
use chroma_types::chroma_proto::{
5-
Collection, CreateCollectionRequest, CreateDatabaseRequest, CreateTenantRequest,
6-
FilterOperator, GetCollectionWithSegmentsRequest, GetPlan, KnnOperator, KnnPlan,
7-
KnnProjectionOperator, LimitOperator, ListCollectionVersionsRequest,
5+
CreateCollectionRequest, CreateDatabaseRequest, CreateTenantRequest, FilterOperator,
6+
GetCollectionWithSegmentsRequest, GetPlan, LimitOperator, ListCollectionVersionsRequest,
87
ListCollectionVersionsResponse, OperationRecord, ProjectionOperator, PushLogsRequest,
98
ScanOperator, Segment, SegmentScope, Vector,
109
};
@@ -163,74 +162,6 @@ impl ChromaGrpcClients {
163162
}
164163
}
165164

166-
#[allow(dead_code)]
167-
pub async fn query_collection(
168-
&mut self,
169-
collection_id: &str,
170-
query_embedding: Vec<f32>,
171-
) -> Result<Vec<(String, f32)>, Box<dyn std::error::Error>> {
172-
// Convert f32 vector to bytes
173-
let vector_bytes: Vec<u8> = query_embedding
174-
.iter()
175-
.flat_map(|&x| x.to_le_bytes().to_vec())
176-
.collect();
177-
178-
let knn_plan = KnnPlan {
179-
scan: Some(ScanOperator {
180-
collection: Some(Collection {
181-
id: collection_id.to_string(),
182-
name: String::new(),
183-
database: String::new(),
184-
tenant: String::new(),
185-
dimension: Some(query_embedding.len() as i32),
186-
configuration_json_str: String::new(),
187-
metadata: None,
188-
log_position: 0,
189-
version: 0,
190-
total_records_post_compaction: 0,
191-
size_bytes_post_compaction: 0,
192-
last_compaction_time_secs: 0,
193-
version_file_path: None,
194-
root_collection_id: None,
195-
lineage_file_path: None,
196-
updated_at: None,
197-
}),
198-
knn: None,
199-
metadata: None,
200-
record: None,
201-
}),
202-
filter: None,
203-
knn: Some(KnnOperator {
204-
embeddings: vec![Vector {
205-
dimension: query_embedding.len() as i32,
206-
vector: vector_bytes,
207-
encoding: 0,
208-
}],
209-
fetch: 2,
210-
}),
211-
projection: Some(KnnProjectionOperator {
212-
projection: Some(ProjectionOperator {
213-
document: false,
214-
embedding: false,
215-
metadata: false,
216-
}),
217-
distance: true,
218-
}),
219-
};
220-
221-
let response = self.query_executor.knn(knn_plan).await?;
222-
let results = response.into_inner().results;
223-
224-
let mut id_distances = Vec::new();
225-
for result in results {
226-
for record in result.records {
227-
id_distances.push((record.record.unwrap().id, record.distance.unwrap_or(0.0)));
228-
}
229-
}
230-
231-
Ok(id_distances)
232-
}
233-
234165
pub async fn get_records(
235166
&mut self,
236167
collection_id: String,

rust/sysdb/src/sqlite.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ use chroma_sqlite::table;
99
use chroma_types::{
1010
Collection, CollectionAndSegments, CollectionMetadataUpdate, CollectionUuid,
1111
CreateCollectionError, CreateCollectionResponse, CreateDatabaseError, CreateDatabaseResponse,
12-
CreateTenantError, CreateTenantResponse, Database, DeleteCollectionError, DeleteDatabaseError,
13-
DeleteDatabaseResponse, GetCollectionWithSegmentsError, GetCollectionsError, GetDatabaseError,
14-
GetSegmentsError, GetTenantError, GetTenantResponse, InternalCollectionConfiguration,
15-
ListDatabasesError, Metadata, MetadataValue, ResetError, ResetResponse, Segment, SegmentScope,
16-
SegmentType, SegmentUuid, UpdateCollectionConfiguration, UpdateCollectionError,
12+
CreateTenantError, CreateTenantResponse, Database, DatabaseUuid, DeleteCollectionError,
13+
DeleteDatabaseError, DeleteDatabaseResponse, GetCollectionWithSegmentsError,
14+
GetCollectionsError, GetDatabaseError, GetSegmentsError, GetTenantError, GetTenantResponse,
15+
InternalCollectionConfiguration, ListDatabasesError, Metadata, MetadataValue, ResetError,
16+
ResetResponse, Segment, SegmentScope, SegmentType, SegmentUuid, UpdateCollectionConfiguration,
17+
UpdateCollectionError,
1718
};
1819
use futures::TryStreamExt;
1920
use sea_query_binder::SqlxBinder;
@@ -289,6 +290,8 @@ impl SqliteSysDb {
289290
_ => CreateCollectionError::Internal(e.into()),
290291
})?;
291292
let database_id = database_result.get::<&str, _>(0);
293+
let database_uuid = DatabaseUuid::from_str(database_id)
294+
.map_err(|_| CreateCollectionError::DatabaseIdParseError)?;
292295

293296
sqlx::query(
294297
r#"
@@ -343,6 +346,7 @@ impl SqliteSysDb {
343346
root_collection_id: None,
344347
lineage_file_path: None,
345348
updated_at: SystemTime::UNIX_EPOCH,
349+
database_id: database_uuid,
346350
})
347351
}
348352

@@ -671,6 +675,7 @@ impl SqliteSysDb {
671675
.column((table::Collections::Table, table::Collections::Dimension))
672676
.column((table::Databases::Table, table::Databases::TenantId))
673677
.column((table::Databases::Table, table::Databases::Name))
678+
.column((table::Collections::Table, table::Collections::DatabaseId))
674679
.columns([
675680
table::CollectionMetadata::Key,
676681
table::CollectionMetadata::StrValue,
@@ -720,6 +725,10 @@ impl SqliteSysDb {
720725
}
721726
None => InternalCollectionConfiguration::default_hnsw(),
722727
};
728+
let database_id = match DatabaseUuid::from_str(first_row.get(6)) {
729+
Ok(db_id) => db_id,
730+
Err(_) => return Some(Err(GetCollectionsError::DatabaseId)),
731+
};
723732

724733
Some(Ok(Collection {
725734
collection_id,
@@ -738,6 +747,7 @@ impl SqliteSysDb {
738747
root_collection_id: None,
739748
lineage_file_path: None,
740749
updated_at: SystemTime::UNIX_EPOCH,
750+
database_id,
741751
}))
742752
})
743753
.collect::<Result<Vec<_>, GetCollectionsError>>()?;

rust/sysdb/src/sysdb.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use chroma_types::{
2020
};
2121
use chroma_types::{
2222
BatchGetCollectionSoftDeleteStatusError, BatchGetCollectionVersionFilePathsError, Collection,
23-
CollectionConversionError, CollectionUuid, CountForksError, FinishDatabaseDeletionError,
24-
FlushCompactionResponse, FlushCompactionResponseConversionError, ForkCollectionError, Segment,
25-
SegmentConversionError, SegmentScope, Tenant,
23+
CollectionConversionError, CollectionUuid, CountForksError, DatabaseUuid,
24+
FinishDatabaseDeletionError, FlushCompactionResponse, FlushCompactionResponseConversionError,
25+
ForkCollectionError, Segment, SegmentConversionError, SegmentScope, Tenant,
2626
};
2727
use std::collections::HashMap;
2828
use std::fmt::Debug;
@@ -266,6 +266,7 @@ impl SysDb {
266266
root_collection_id: None,
267267
lineage_file_path: None,
268268
updated_at: SystemTime::now(),
269+
database_id: DatabaseUuid::new(),
269270
};
270271

271272
test_sysdb.add_collection(collection.clone());

rust/types/src/api_types.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,8 @@ pub enum CreateCollectionError {
646646
SpannNotImplemented,
647647
#[error("HNSW is not supported on this platform")]
648648
HnswNotSupported,
649+
#[error("Failed to parse db id")]
650+
DatabaseIdParseError,
649651
}
650652

651653
impl ChromaError for CreateCollectionError {
@@ -662,6 +664,7 @@ impl ChromaError for CreateCollectionError {
662664
CreateCollectionError::Aborted(_) => ErrorCodes::Aborted,
663665
CreateCollectionError::SpannNotImplemented => ErrorCodes::InvalidArgument,
664666
CreateCollectionError::HnswNotSupported => ErrorCodes::InvalidArgument,
667+
CreateCollectionError::DatabaseIdParseError => ErrorCodes::Internal,
665668
}
666669
}
667670
}
@@ -688,6 +691,8 @@ pub enum GetCollectionsError {
688691
Configuration(#[from] serde_json::Error),
689692
#[error("Could not deserialize collection ID")]
690693
CollectionId(#[from] uuid::Error),
694+
#[error("Could not deserialize database ID")]
695+
DatabaseId,
691696
}
692697

693698
impl ChromaError for GetCollectionsError {
@@ -696,6 +701,7 @@ impl ChromaError for GetCollectionsError {
696701
GetCollectionsError::Internal(err) => err.code(),
697702
GetCollectionsError::Configuration(_) => ErrorCodes::Internal,
698703
GetCollectionsError::CollectionId(_) => ErrorCodes::Internal,
704+
GetCollectionsError::DatabaseId => ErrorCodes::Internal,
699705
}
700706
}
701707
}

rust/types/src/collection.rs

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::str::FromStr;
2+
13
use super::{Metadata, MetadataValueConversionError};
24
use crate::{
35
chroma_proto, test_segment, CollectionConfiguration, InternalCollectionConfiguration, Segment,
@@ -30,6 +32,29 @@ use pyo3::types::PyAnyMethods;
3032
)]
3133
pub struct CollectionUuid(pub Uuid);
3234

35+
/// DatabaseUuid is a wrapper around Uuid to provide a type for the database id.
36+
#[derive(
37+
Copy,
38+
Clone,
39+
Debug,
40+
Default,
41+
Deserialize,
42+
Eq,
43+
PartialEq,
44+
Ord,
45+
PartialOrd,
46+
Hash,
47+
Serialize,
48+
ToSchema,
49+
)]
50+
pub struct DatabaseUuid(pub Uuid);
51+
52+
impl DatabaseUuid {
53+
pub fn new() -> Self {
54+
DatabaseUuid(Uuid::new_v4())
55+
}
56+
}
57+
3358
impl CollectionUuid {
3459
pub fn new() -> Self {
3560
CollectionUuid(Uuid::new_v4())
@@ -51,6 +76,17 @@ impl std::str::FromStr for CollectionUuid {
5176
}
5277
}
5378

79+
impl std::str::FromStr for DatabaseUuid {
80+
type Err = uuid::Error;
81+
82+
fn from_str(s: &str) -> Result<Self, Self::Err> {
83+
match Uuid::parse_str(s) {
84+
Ok(uuid) => Ok(DatabaseUuid(uuid)),
85+
Err(err) => Err(err),
86+
}
87+
}
88+
}
89+
5490
impl std::fmt::Display for CollectionUuid {
5591
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
5692
write!(f, "{}", self.0)
@@ -107,6 +143,8 @@ pub struct Collection {
107143
pub lineage_file_path: Option<String>,
108144
#[serde(skip, default = "SystemTime::now")]
109145
pub updated_at: SystemTime,
146+
#[serde(skip)]
147+
pub database_id: DatabaseUuid,
110148
}
111149

112150
impl Default for Collection {
@@ -128,6 +166,7 @@ impl Default for Collection {
128166
root_collection_id: None,
129167
lineage_file_path: None,
130168
updated_at: SystemTime::now(),
169+
database_id: DatabaseUuid::new(),
131170
}
132171
}
133172
}
@@ -218,11 +257,8 @@ impl TryFrom<chroma_proto::Collection> for Collection {
218257
type Error = CollectionConversionError;
219258

220259
fn try_from(proto_collection: chroma_proto::Collection) -> Result<Self, Self::Error> {
221-
let collection_uuid = match Uuid::try_parse(&proto_collection.id) {
222-
Ok(uuid) => uuid,
223-
Err(_) => return Err(CollectionConversionError::InvalidUuid),
224-
};
225-
let collection_id = CollectionUuid(collection_uuid);
260+
let collection_id = CollectionUuid::from_str(&proto_collection.id)
261+
.map_err(|_| CollectionConversionError::InvalidUuid)?;
226262
let collection_metadata: Option<Metadata> = match proto_collection.metadata {
227263
Some(proto_metadata) => match proto_metadata.try_into() {
228264
Ok(metadata) => Some(metadata),
@@ -238,6 +274,12 @@ impl TryFrom<chroma_proto::Collection> for Collection {
238274
}
239275
None => SystemTime::now(),
240276
};
277+
// TOOD(Sanket): this should be updated to error with "missing field" once all SysDb deployments are up-to-date
278+
let database_id = match proto_collection.database_id {
279+
Some(db_id) => DatabaseUuid::from_str(&db_id)
280+
.map_err(|_| CollectionConversionError::InvalidUuid)?,
281+
None => DatabaseUuid::new(),
282+
};
241283
Ok(Collection {
242284
collection_id,
243285
name: proto_collection.name,
@@ -257,6 +299,7 @@ impl TryFrom<chroma_proto::Collection> for Collection {
257299
.map(|uuid| CollectionUuid(Uuid::try_parse(&uuid).unwrap())),
258300
lineage_file_path: proto_collection.lineage_file_path,
259301
updated_at,
302+
database_id,
260303
})
261304
}
262305
}
@@ -296,6 +339,7 @@ impl TryFrom<Collection> for chroma_proto::Collection {
296339
root_collection_id: value.root_collection_id.map(|uuid| uuid.0.to_string()),
297340
lineage_file_path: value.lineage_file_path,
298341
updated_at: Some(value.updated_at.into()),
342+
database_id: Some(value.database_id.0.to_string()),
299343
})
300344
}
301345
}
@@ -348,6 +392,7 @@ mod test {
348392
seconds: 1,
349393
nanos: 1,
350394
}),
395+
database_id: Some("00000000-0000-0000-0000-000000000000".to_string()),
351396
};
352397
let converted_collection: Collection = proto_collection.try_into().unwrap();
353398
assert_eq!(
@@ -378,6 +423,7 @@ mod test {
378423
converted_collection.updated_at,
379424
SystemTime::UNIX_EPOCH + Duration::new(1, 1)
380425
);
426+
assert_eq!(converted_collection.database_id, DatabaseUuid(Uuid::nil()));
381427
}
382428

383429
#[test]

rust/worker/src/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ mod tests {
513513

514514
fn scan() -> chroma_proto::ScanOperator {
515515
let collection_id = Uuid::new_v4().to_string();
516+
let database_id = Uuid::new_v4().to_string();
516517
chroma_proto::ScanOperator {
517518
collection: Some(chroma_proto::Collection {
518519
id: collection_id.clone(),
@@ -522,6 +523,7 @@ mod tests {
522523
dimension: None,
523524
tenant: "test-tenant".to_string(),
524525
database: "test-database".to_string(),
526+
database_id: Some(database_id.clone()),
525527
..Default::default()
526528
}),
527529
knn: Some(chroma_proto::Segment {

0 commit comments

Comments
 (0)