Skip to content

Commit 08c6526

Browse files
[HOTFIX] applying PR #4867 to release/2025-06-13 (#4875)
This PR cherry-picks the commit 6e5ec8f onto release/2025-06-13. If there are unresolved conflicts, please resolve them manually. Co-authored-by: Max Isom <[email protected]>
1 parent bd9dbee commit 08c6526

9 files changed

+58
-43
lines changed

rust/garbage_collector/src/construct_version_graph_orchestrator.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use petgraph::{dot::Dot, graph::DiGraph};
3030
use std::{
3131
collections::{HashMap, HashSet},
3232
str::FromStr,
33+
sync::Arc,
3334
};
3435
use thiserror::Error;
3536
use tokio::sync::oneshot::{error::RecvError, Sender};
@@ -55,7 +56,7 @@ pub struct ConstructVersionGraphOrchestrator {
5556
lineage_file_path: Option<String>,
5657

5758
version_dependencies: Vec<VersionDependency>,
58-
version_files: HashMap<CollectionUuid, CollectionVersionFile>,
59+
version_files: HashMap<CollectionUuid, Arc<CollectionVersionFile>>,
5960
num_pending_tasks: usize,
6061
}
6162

@@ -88,7 +89,7 @@ impl ConstructVersionGraphOrchestrator {
8889
#[derive(Debug)]
8990
#[allow(dead_code)]
9091
pub struct ConstructVersionGraphResponse {
91-
pub version_files: HashMap<CollectionUuid, CollectionVersionFile>,
92+
pub version_files: HashMap<CollectionUuid, Arc<CollectionVersionFile>>,
9293
pub graph: VersionGraph,
9394
}
9495

rust/garbage_collector/src/garbage_collector_orchestrator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
//! - Input: Version file, versions to delete, unused S3 files
3838
//! - Output: Deletion confirmation
3939
40-
use std::fmt::{Debug, Formatter};
41-
4240
use crate::types::{CleanupMode, GarbageCollectorResponse};
4341
use async_trait::async_trait;
4442
use chroma_error::{ChromaError, ErrorCodes};
@@ -51,6 +49,8 @@ use chroma_system::{
5149
use chroma_types::chroma_proto::CollectionVersionFile;
5250
use chroma_types::CollectionUuid;
5351
use chrono::{DateTime, Utc};
52+
use std::fmt::{Debug, Formatter};
53+
use std::sync::Arc;
5454
use thiserror::Error;
5555
use tokio::sync::oneshot::{error::RecvError, Sender};
5656
use tracing::Span;
@@ -87,7 +87,7 @@ pub struct GarbageCollectorOrchestrator {
8787
dispatcher: ComponentHandle<Dispatcher>,
8888
storage: Storage,
8989
result_channel: Option<Sender<Result<GarbageCollectorResponse, GarbageCollectorError>>>,
90-
pending_version_file: Option<CollectionVersionFile>,
90+
pending_version_file: Option<Arc<CollectionVersionFile>>,
9191
pending_versions_to_delete: Option<chroma_types::chroma_proto::VersionListForCollection>,
9292
pending_epoch_id: Option<i64>,
9393
num_versions_deleted: u32,

rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use chrono::{DateTime, Utc};
4040
use petgraph::algo::toposort;
4141
use std::collections::{HashMap, HashSet};
4242
use std::str::FromStr;
43+
use std::sync::Arc;
4344
use std::time::SystemTime;
4445
use thiserror::Error;
4546
use tokio::sync::oneshot::{error::RecvError, Sender};
@@ -59,7 +60,7 @@ pub struct GarbageCollectorOrchestrator {
5960
root_manager: RootManager,
6061
result_channel: Option<Sender<Result<GarbageCollectorResponse, GarbageCollectorError>>>,
6162
cleanup_mode: CleanupMode,
62-
version_files: HashMap<CollectionUuid, CollectionVersionFile>,
63+
version_files: HashMap<CollectionUuid, Arc<CollectionVersionFile>>,
6364
versions_to_delete_output: Option<ComputeVersionsToDeleteOutput>,
6465
pending_mark_versions_at_sysdb_tasks: HashSet<CollectionUuid>,
6566
pending_list_files_at_version_tasks: HashSet<(CollectionUuid, i64)>,

rust/garbage_collector/src/operators/compute_unused_files.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use chroma_types::{
88
chroma_proto::{CollectionSegmentInfo, CollectionVersionFile, VersionListForCollection},
99
HNSW_PATH,
1010
};
11-
use std::collections::{HashMap, HashSet};
11+
use std::{
12+
collections::{HashMap, HashSet},
13+
sync::Arc,
14+
};
1215
use thiserror::Error;
1316
use uuid::Uuid;
1417

@@ -171,7 +174,7 @@ impl ComputeUnusedFilesOperator {
171174

172175
#[derive(Clone)]
173176
pub struct ComputeUnusedFilesInput {
174-
pub version_file: CollectionVersionFile,
177+
pub version_file: Arc<CollectionVersionFile>,
175178
pub versions_to_delete: VersionListForCollection,
176179
pub oldest_version_to_keep: i64,
177180
}
@@ -239,7 +242,9 @@ impl Operator<ComputeUnusedFilesInput, ComputeUnusedFilesOutput> for ComputeUnus
239242
// Build a map to version to segment_info
240243
let mut version_to_segment_info = HashMap::new();
241244
let version_history = version_file
245+
.as_ref()
242246
.version_history
247+
.as_ref()
243248
.ok_or(ComputeUnusedFilesError::MissingVersionHistory)?;
244249

245250
// Check if version history is empty
@@ -465,7 +470,7 @@ mod tests {
465470

466471
let input = ComputeUnusedFilesInput {
467472
oldest_version_to_keep: 3,
468-
version_file: chroma_proto::CollectionVersionFile {
473+
version_file: Arc::new(chroma_proto::CollectionVersionFile {
469474
collection_info_immutable: None,
470475
version_history: Some(CollectionVersionHistory {
471476
versions: vec![
@@ -520,7 +525,7 @@ mod tests {
520525
},
521526
],
522527
}),
523-
},
528+
}),
524529
versions_to_delete: chroma_proto::VersionListForCollection {
525530
versions: vec![1, 2],
526531
collection_id: "test_collection".to_string(),
@@ -570,12 +575,12 @@ mod tests {
570575

571576
// Create input with missing version info
572577
let input = ComputeUnusedFilesInput {
573-
version_file: chroma_proto::CollectionVersionFile {
578+
version_file: Arc::new(chroma_proto::CollectionVersionFile {
574579
collection_info_immutable: None,
575580
version_history: Some(CollectionVersionHistory {
576581
versions: vec![], // Empty version history wrapped in Some
577582
}),
578-
},
583+
}),
579584
versions_to_delete: chroma_proto::VersionListForCollection {
580585
versions: vec![1, 2], // Versions that don't exist in history
581586
collection_id: "test_collection".to_string(),
@@ -606,7 +611,7 @@ mod tests {
606611
);
607612

608613
let input = ComputeUnusedFilesInput {
609-
version_file: chroma_proto::CollectionVersionFile {
614+
version_file: Arc::new(chroma_proto::CollectionVersionFile {
610615
collection_info_immutable: None,
611616
version_history: Some(CollectionVersionHistory {
612617
versions: vec![
@@ -639,7 +644,7 @@ mod tests {
639644
},
640645
],
641646
}),
642-
},
647+
}),
643648
versions_to_delete: chroma_proto::VersionListForCollection {
644649
versions: vec![1, 2], // Try to delete 2 versions
645650
collection_id: "test_collection".to_string(),

rust/garbage_collector/src/operators/compute_versions_to_delete.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,22 @@ use chroma_system::{Operator, OperatorType};
44
use chroma_types::chroma_proto::{CollectionVersionFile, VersionListForCollection};
55
use chrono::{DateTime, Utc};
66
use humantime::format_duration;
7-
use std::time::Duration;
7+
use std::{sync::Arc, time::Duration};
88
use thiserror::Error;
99

1010
#[derive(Clone, Debug)]
1111
pub struct ComputeVersionsToDeleteOperator {}
1212

1313
#[derive(Debug)]
1414
pub struct ComputeVersionsToDeleteInput {
15-
pub version_file: CollectionVersionFile,
15+
pub version_file: Arc<CollectionVersionFile>,
1616
pub cutoff_time: DateTime<Utc>,
1717
pub min_versions_to_keep: u32,
1818
}
1919

2020
#[derive(Debug)]
2121
pub struct ComputeVersionsToDeleteOutput {
22-
pub version_file: CollectionVersionFile,
22+
pub version_file: Arc<CollectionVersionFile>,
2323
pub versions_to_delete: VersionListForCollection,
2424
pub oldest_version_to_keep: i64,
2525
}
@@ -54,7 +54,7 @@ impl Operator<ComputeVersionsToDeleteInput, ComputeVersionsToDeleteOutput>
5454
&self,
5555
input: &ComputeVersionsToDeleteInput,
5656
) -> Result<ComputeVersionsToDeleteOutput, ComputeVersionsToDeleteError> {
57-
let mut version_file = input.version_file.clone();
57+
let version_file = input.version_file.clone();
5858
let collection_info = version_file
5959
.collection_info_immutable
6060
.as_ref()
@@ -73,6 +73,7 @@ impl Operator<ComputeVersionsToDeleteInput, ComputeVersionsToDeleteOutput>
7373
let mut marked_versions = Vec::new();
7474
let mut oldest_version_to_keep = 0;
7575

76+
let mut version_file = version_file.as_ref().clone();
7677
if let Some(ref mut version_history) = version_file.version_history {
7778
let mut unique_versions_seen = 0;
7879
let mut last_version = None;
@@ -149,7 +150,7 @@ impl Operator<ComputeVersionsToDeleteInput, ComputeVersionsToDeleteOutput>
149150
);
150151

151152
Ok(ComputeVersionsToDeleteOutput {
152-
version_file,
153+
version_file: Arc::new(version_file),
153154
versions_to_delete,
154155
oldest_version_to_keep,
155156
})
@@ -198,7 +199,7 @@ mod tests {
198199
],
199200
};
200201

201-
let version_file = CollectionVersionFile {
202+
let version_file = Arc::new(CollectionVersionFile {
202203
version_history: Some(version_history),
203204
collection_info_immutable: Some(CollectionInfoImmutable {
204205
tenant_id: "test_tenant".to_string(),
@@ -207,7 +208,7 @@ mod tests {
207208
dimension: 0,
208209
..Default::default()
209210
}),
210-
};
211+
});
211212

212213
let input = ComputeVersionsToDeleteInput {
213214
version_file,
@@ -221,7 +222,12 @@ mod tests {
221222
.unwrap();
222223

223224
// Verify the results.
224-
let versions = &result.version_file.version_history.unwrap().versions;
225+
let versions = &result
226+
.version_file
227+
.version_history
228+
.as_ref()
229+
.unwrap()
230+
.versions;
225231
assert!(versions[0].marked_for_deletion);
226232
assert!(versions[1].marked_for_deletion);
227233
assert!(!versions[2].marked_for_deletion); // Version 2 should be kept.

rust/garbage_collector/src/operators/delete_versions_at_sysdb.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use chroma_sysdb::SysDb;
55
use chroma_system::{Operator, OperatorType};
66
use chroma_types::chroma_proto::{CollectionVersionFile, VersionListForCollection};
77
use futures::stream::StreamExt;
8-
use std::collections::HashSet;
8+
use std::{collections::HashSet, sync::Arc};
99
use thiserror::Error;
1010

1111
#[derive(Clone)]
@@ -22,7 +22,7 @@ impl std::fmt::Debug for DeleteVersionsAtSysDbOperator {
2222

2323
#[derive(Debug)]
2424
pub struct DeleteVersionsAtSysDbInput {
25-
pub version_file: CollectionVersionFile,
25+
pub version_file: Arc<CollectionVersionFile>,
2626
pub epoch_id: i64,
2727
pub sysdb_client: SysDb,
2828
pub versions_to_delete: VersionListForCollection,
@@ -31,7 +31,7 @@ pub struct DeleteVersionsAtSysDbInput {
3131

3232
#[derive(Debug)]
3333
pub struct DeleteVersionsAtSysDbOutput {
34-
pub version_file: CollectionVersionFile,
34+
pub version_file: Arc<CollectionVersionFile>,
3535
pub versions_to_delete: VersionListForCollection,
3636
pub unused_s3_files: HashSet<String>,
3737
}
@@ -209,10 +209,10 @@ mod tests {
209209
let sysdb = SysDb::Test(TestSysDb::new());
210210

211211
// Create a version file with actual version history
212-
let version_file = CollectionVersionFile {
212+
let version_file = Arc::new(CollectionVersionFile {
213213
version_history: Some(chroma_proto::CollectionVersionHistory { versions: vec![] }),
214214
..Default::default()
215-
};
215+
});
216216

217217
let versions_to_delete = VersionListForCollection {
218218
collection_id: "test_collection".to_string(),
@@ -245,7 +245,7 @@ mod tests {
245245
let tmp_dir = TempDir::new().unwrap();
246246
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
247247
let sysdb = SysDb::Test(TestSysDb::new());
248-
let version_file = CollectionVersionFile::default();
248+
let version_file = Arc::new(CollectionVersionFile::default());
249249
let versions_to_delete = VersionListForCollection {
250250
collection_id: "test_collection".to_string(),
251251
database_id: "default".to_string(),
@@ -368,7 +368,7 @@ mod tests {
368368
}
369369

370370
// Create version file with history
371-
let version_file = CollectionVersionFile {
371+
let version_file = Arc::new(CollectionVersionFile {
372372
version_history: Some(chroma_proto::CollectionVersionHistory {
373373
versions: vec![
374374
chroma_proto::CollectionVersionInfo {
@@ -386,7 +386,7 @@ mod tests {
386386
],
387387
}),
388388
..Default::default()
389-
};
389+
});
390390

391391
let versions_to_delete = VersionListForCollection {
392392
collection_id: "test_collection".to_string(),
@@ -439,7 +439,7 @@ mod tests {
439439
}
440440

441441
// Create version file with history
442-
let version_file = CollectionVersionFile {
442+
let version_file = Arc::new(CollectionVersionFile {
443443
version_history: Some(chroma_proto::CollectionVersionHistory {
444444
versions: vec![
445445
chroma_proto::CollectionVersionInfo {
@@ -457,7 +457,7 @@ mod tests {
457457
],
458458
}),
459459
..Default::default()
460-
};
460+
});
461461

462462
let versions_to_delete = VersionListForCollection {
463463
collection_id: "test_collection".to_string(),

rust/garbage_collector/src/operators/fetch_version_file.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use chroma_types::CollectionUuid;
1717
use prost::Message;
1818
use std::fmt::{Debug, Formatter};
1919
use std::str::FromStr;
20+
use std::sync::Arc;
2021
use thiserror::Error;
2122

2223
#[derive(Clone, Debug, Default)]
@@ -53,7 +54,7 @@ impl Debug for FetchVersionFileInput {
5354
#[allow(dead_code)]
5455
#[derive(Debug)]
5556
pub struct FetchVersionFileOutput {
56-
pub file: CollectionVersionFile,
57+
pub file: Arc<CollectionVersionFile>,
5758
pub collection_id: CollectionUuid,
5859
}
5960

@@ -128,7 +129,7 @@ impl Operator<FetchVersionFileInput, FetchVersionFileOutput> for FetchVersionFil
128129
.map_err(FetchVersionFileError::InvalidUuid)?;
129130

130131
Ok(FetchVersionFileOutput {
131-
file: version_file,
132+
file: Arc::new(version_file),
132133
collection_id,
133134
})
134135
}
@@ -212,7 +213,7 @@ mod tests {
212213
let result = operator.run(&input).await.expect("Failed to run operator");
213214

214215
// Verify the content
215-
assert_eq!(result.file, test_file);
216+
assert_eq!(result.file, test_file.into());
216217

217218
// Cleanup - Note: object_store doesn't have a delete method,
218219
// but the test bucket should be cleaned up between test runs

rust/garbage_collector/src/operators/list_files_at_version.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,22 @@ use chroma_blockstore::{arrow::provider::RootManagerError, RootManager};
33
use chroma_storage::StorageError;
44
use chroma_system::{Operator, OperatorType};
55
use chroma_types::{chroma_proto::CollectionVersionFile, CollectionUuid, HNSW_PATH};
6-
use std::{collections::HashSet, str::FromStr};
6+
use std::{collections::HashSet, str::FromStr, sync::Arc};
77
use thiserror::Error;
88
use tokio::task::{JoinError, JoinSet};
99
use uuid::Uuid;
1010

1111
#[derive(Debug)]
1212
pub struct ListFilesAtVersionInput {
1313
root_manager: RootManager,
14-
version_file: CollectionVersionFile,
14+
version_file: Arc<CollectionVersionFile>,
1515
version: i64,
1616
}
1717

1818
impl ListFilesAtVersionInput {
1919
pub fn new(
2020
root_manager: RootManager,
21-
version_file: CollectionVersionFile,
21+
version_file: Arc<CollectionVersionFile>,
2222
version: i64,
2323
) -> Self {
2424
Self {

0 commit comments

Comments
 (0)