Skip to content

[CLN] Deprecate delete list file in GC #4266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 3 additions & 158 deletions rust/garbage_collector/src/operators/delete_unused_files.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,14 @@
use crate::types::CleanupMode;
use crate::types::{DELETE_LIST_FILE_PREFIX, RENAMED_FILE_PREFIX};
use crate::types::RENAMED_FILE_PREFIX;
use async_trait::async_trait;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_index::HNSW_INDEX_S3_PREFIX;
use chroma_storage::{PutOptions, Storage};
use chroma_storage::Storage;
use chroma_system::{Operator, OperatorType};
use futures::stream::StreamExt;
use std::collections::HashSet;
use thiserror::Error;

struct DeletionListBuilder {
files: Vec<String>,
failed_files: Vec<String>,
}

impl DeletionListBuilder {
fn new() -> Self {
Self {
files: Vec::new(),
failed_files: Vec::new(),
}
}

fn add_files(mut self, files: &[String]) -> Self {
self.files.extend(files.iter().cloned());
self
}

fn add_failed_files(mut self, failed_files: &[String]) -> Self {
self.failed_files.extend(failed_files.iter().cloned());
self
}

fn build(mut self) -> String {
let mut content = String::from("Deleted Files:\n");
content.push_str(&self.files.join("\n"));

if !self.failed_files.is_empty() {
self.failed_files.sort();
content.push_str("\n\nFailed files:\n");
content.push_str(&self.failed_files.join("\n"));
}

content
}
}

#[derive(Clone)]
pub struct DeleteUnusedFilesOperator {
storage: Storage,
Expand Down Expand Up @@ -84,44 +47,6 @@ impl DeleteUnusedFilesOperator {
)
}

fn get_deletion_list_path(&self, timestamp: i64) -> String {
format!(
"{}{}/{}.txt",
DELETE_LIST_FILE_PREFIX, self.collection_id, timestamp
)
}

async fn write_deletion_list(
&self,
files: &[String],
timestamp: i64,
failed_files: &[String],
) -> Result<(), DeleteUnusedFilesError> {
let final_content = DeletionListBuilder::new()
.add_files(files)
.add_failed_files(failed_files)
.build();

let path = self.get_deletion_list_path(timestamp);

tracing::info!(
path = %path,
file_count = files.len(),
failed_count = failed_files.len(),
"Writing deletion list to S3"
);

self.storage
.put_bytes(&path, final_content.into_bytes(), PutOptions::default())
.await
.map_err(|e| DeleteUnusedFilesError::WriteListError {
path: path.clone(),
message: e.to_string(),
})?;

Ok(())
}

async fn delete_with_path(&self, file_path: String) -> Result<(), FileOperationError> {
self.storage
.delete(&file_path)
Expand Down Expand Up @@ -267,10 +192,6 @@ impl Operator<DeleteUnusedFilesInput, DeleteUnusedFilesOutput> for DeleteUnusedF
}
}

// Write the deletion list with any potential failed files
self.write_deletion_list(&all_files, input.epoch_id, &file_operation_errors)
.await?;

tracing::debug!(
"File deletion operation completed with {} file operation errors",
file_operation_errors.len()
Expand Down Expand Up @@ -326,7 +247,7 @@ mod tests {
async fn test_dry_run_mode() {
let tmp_dir = TempDir::new().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let (test_files, hnsw_files) = setup_test_files(&storage).await;
let (test_files, _) = setup_test_files(&storage).await;

let mut unused_files = HashSet::new();
unused_files.extend(test_files.clone());
Expand All @@ -344,28 +265,11 @@ mod tests {

let result = operator.run(&input).await.unwrap();

// Verify deletion list file was created
let deletion_list_path = tmp_dir.path().join(format!(
"{}test_collection/123.txt",
DELETE_LIST_FILE_PREFIX
));
assert!(deletion_list_path.exists());

// Verify original files still exist
for file in &test_files {
assert!(result.deleted_files.contains(file));
assert!(Path::new(&tmp_dir.path().join(file)).exists());
}

// Read and verify deletion list content
let content = std::fs::read_to_string(deletion_list_path).unwrap();
let listed_files: HashSet<_> = content.lines().collect();
for file in &test_files {
assert!(listed_files.contains(file.as_str()));
}
for file in &hnsw_files {
assert!(listed_files.contains(file.as_str()));
}
}

#[tokio::test]
Expand All @@ -390,13 +294,6 @@ mod tests {

let result = operator.run(&input).await.unwrap();

// Verify deletion list was created
let deletion_list_path = tmp_dir.path().join(format!(
"{}test_collection/123.txt",
DELETE_LIST_FILE_PREFIX
));
assert!(deletion_list_path.exists());

// Verify regular files were moved to deleted directory
for file in &test_files {
let original_path = tmp_dir.path().join(file);
Expand All @@ -420,16 +317,6 @@ mod tests {
assert!(new_path.exists());
assert!(result.deleted_files.contains(file));
}

// Verify deletion list contents
let content = std::fs::read_to_string(deletion_list_path).unwrap();
let listed_files: HashSet<_> = content.lines().collect();
for file in &test_files {
assert!(listed_files.contains(file.as_str()));
}
for file in &hnsw_files {
assert!(listed_files.contains(file.as_str()));
}
}

#[tokio::test]
Expand All @@ -454,13 +341,6 @@ mod tests {

let result = operator.run(&input).await.unwrap();

// Verify deletion list was created
let deletion_list_path = tmp_dir.path().join(format!(
"{}test_collection/123.txt",
DELETE_LIST_FILE_PREFIX
));
assert!(deletion_list_path.exists());

// Verify regular files were deleted
for file in &test_files {
assert!(!Path::new(&tmp_dir.path().join(file)).exists());
Expand All @@ -472,16 +352,6 @@ mod tests {
assert!(!Path::new(&tmp_dir.path().join(file)).exists());
assert!(result.deleted_files.contains(file));
}

// Verify deletion list contents
let content = std::fs::read_to_string(deletion_list_path).unwrap();
let listed_files: HashSet<_> = content.lines().collect();
for file in &test_files {
assert!(listed_files.contains(file.as_str()));
}
for file in &hnsw_files {
assert!(listed_files.contains(file.as_str()));
}
}

#[tokio::test]
Expand All @@ -507,15 +377,6 @@ mod tests {
.await;
assert!(result.is_ok());

// Verify deletion list contains the error
let deletion_list_path = tmp_dir.path().join(format!(
"{}test_collection/123.txt",
DELETE_LIST_FILE_PREFIX
));
let content = std::fs::read_to_string(deletion_list_path).unwrap();
assert!(content.contains("Failed files:"));
assert!(content.contains("nonexistent.txt"));

// Test Rename mode - should succeed but record the error in deletion list
let rename_operator = DeleteUnusedFilesOperator::new(
storage.clone(),
Expand All @@ -531,15 +392,6 @@ mod tests {
.await;
assert!(result.is_ok());

// Verify deletion list contains the error
let deletion_list_path = tmp_dir.path().join(format!(
"{}test_collection/124.txt",
DELETE_LIST_FILE_PREFIX
));
let content = std::fs::read_to_string(deletion_list_path).unwrap();
assert!(content.contains("Failed files:"));
assert!(content.contains("nonexistent.txt"));

// Test DryRun mode with nonexistent files (should succeed)
let list_operator = DeleteUnusedFilesOperator::new(
storage,
Expand All @@ -554,12 +406,5 @@ mod tests {
})
.await;
assert!(result.is_ok());

// Verify deletion list was created even for nonexistent files
let deletion_list_path = tmp_dir.path().join(format!(
"{}test_collection/125.txt",
DELETE_LIST_FILE_PREFIX
));
assert!(deletion_list_path.exists());
}
}
2 changes: 0 additions & 2 deletions rust/garbage_collector/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// GC will use it to rename a S3 file to a new name.
pub(crate) const RENAMED_FILE_PREFIX: &str = "gc/renamed/";
// GC will use it to list files that would be deleted.
pub(crate) const DELETE_LIST_FILE_PREFIX: &str = "gc/delete-list/";

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Deserialize, Default)]
#[serde(rename_all = "lowercase")]
Expand Down
Loading