Skip to content

[ENH] Config to disable compactor on collections #3469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ compaction_service:
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: [] # uuids to disable compaction for
blockfile_provider:
Arrow:
block_manager_config:
Expand Down
10 changes: 10 additions & 0 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ use chroma_system::{Component, ComponentContext, ComponentHandle, Handler, Syste
use chroma_types::CollectionUuid;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::str::FromStr;
use std::time::Duration;
use thiserror::Error;
use tracing::instrument;
use tracing::span;
use tracing::Instrument;
use tracing::Span;
use uuid::Uuid;

pub(crate) struct CompactionManager {
system: Option<System>,
Expand Down Expand Up @@ -220,6 +223,11 @@ impl Configurable<CompactionServiceConfig> for CompactionManager {
let min_compaction_size = config.compactor.min_compaction_size;
let max_compaction_size = config.compactor.max_compaction_size;
let max_partition_size = config.compactor.max_partition_size;
let mut disabled_collections =
HashSet::with_capacity(config.compactor.disabled_collections.len());
for collection_id_str in &config.compactor.disabled_collections {
disabled_collections.insert(CollectionUuid(Uuid::from_str(collection_id_str).unwrap()));
}

let assignment_policy_config = &config.assignment_policy;
let assignment_policy = match crate::assignment::from_config(assignment_policy_config).await
Expand All @@ -237,6 +245,7 @@ impl Configurable<CompactionServiceConfig> for CompactionManager {
max_concurrent_jobs,
min_compaction_size,
assignment_policy,
disabled_collections,
);

let blockfile_provider = BlockfileProvider::try_from_config(&(
Expand Down Expand Up @@ -519,6 +528,7 @@ mod tests {
max_concurrent_jobs,
min_compaction_size,
assignment_policy,
HashSet::new(),
);
// Set memberlist
scheduler.set_memberlist(vec![my_member_id.clone()]);
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/compactor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ pub(crate) struct CompactorConfig {
pub(crate) min_compaction_size: usize,
pub(crate) max_compaction_size: usize,
pub(crate) max_partition_size: usize,
pub(crate) disabled_collections: Vec<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option<Vec<String>> for backwards compatibility/optional args.

}
89 changes: 89 additions & 0 deletions rust/worker/src/compactor/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
use std::collections::HashSet;
use std::str::FromStr;

use chroma_types::CollectionUuid;
use figment::providers::Env;
use figment::Figment;
use serde::Deserialize;
use uuid::Uuid;

use crate::assignment::assignment_policy::AssignmentPolicy;
use crate::compactor::scheduler_policy::SchedulerPolicy;
use crate::compactor::types::CompactionJob;
Expand All @@ -17,9 +26,16 @@ pub(crate) struct Scheduler {
min_compaction_size: usize,
memberlist: Option<Memberlist>,
assignment_policy: Box<dyn AssignmentPolicy>,
disabled_collections: HashSet<CollectionUuid>,
}

#[derive(Deserialize, Debug)]
struct RunTimeConfig {
disabled_collections: Vec<String>,
}

impl Scheduler {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
my_ip: String,
log: Box<Log>,
Expand All @@ -28,6 +44,7 @@ impl Scheduler {
max_concurrent_jobs: usize,
min_compaction_size: usize,
assignment_policy: Box<dyn AssignmentPolicy>,
disabled_collections: HashSet<CollectionUuid>,
) -> Scheduler {
Scheduler {
my_ip,
Expand All @@ -39,6 +56,7 @@ impl Scheduler {
max_concurrent_jobs,
memberlist: None,
assignment_policy,
disabled_collections,
}
}

Expand All @@ -63,6 +81,16 @@ impl Scheduler {
) -> Vec<CollectionRecord> {
let mut collection_records = Vec::new();
for collection_info in collections {
if self
.disabled_collections
.contains(&collection_info.collection_id)
{
tracing::info!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we bump to warning so that if we ever disable info we aren't in the dark?

"Ignoring collection: {:?} because it disabled for compaction",
collection_info.collection_id
);
continue;
}
let collection_id = Some(collection_info.collection_id);
// TODO: add a cache to avoid fetching the same collection multiple times
let result = self
Expand Down Expand Up @@ -161,13 +189,39 @@ impl Scheduler {
self.job_queue.extend(jobs);
}

pub(crate) fn recompute_disabled_collections(&mut self) {
let config = Figment::new()
.merge(
Env::prefixed("CHROMA_")
.map(|k| k.as_str().replace("__", ".").into())
.map(|k| {
if k == "COMPACTION_SERVICE.COMPACTOR.DISABLED_COLLECTIONS" {
k["COMPACTION_SERVICE.COMPACTOR.".len()..].into()
} else {
k.into()
}
})
.only(&["DISABLED_COLLECTIONS"]),
)
.extract::<RunTimeConfig>();
if let Ok(config) = config {
self.disabled_collections = config
.disabled_collections
.iter()
.map(|collection| CollectionUuid(Uuid::from_str(collection).unwrap()))
.collect();
}
}

pub(crate) async fn schedule(&mut self) {
// For now, we clear the job queue every time, assuming we will not have any pending jobs running
self.job_queue.clear();
if self.memberlist.is_none() || self.memberlist.as_ref().unwrap().is_empty() {
tracing::error!("Memberlist is not set or empty. Cannot schedule compaction jobs.");
return;
}
// Recompute disabled list.
self.recompute_disabled_collections();
let collections = self.get_collections_with_new_data().await;
if collections.is_empty() {
return;
Expand Down Expand Up @@ -300,6 +354,7 @@ mod tests {
max_concurrent_jobs,
1,
assignment_policy,
HashSet::new(),
);
// Scheduler does nothing without memberlist
scheduler.schedule().await;
Expand Down Expand Up @@ -338,6 +393,39 @@ mod tests {
assert_eq!(jobs[0].collection_id, collection_uuid_2,);
assert_eq!(jobs[1].collection_id, collection_uuid_1,);

// Set disable list.
std::env::set_var(
"CHROMA_COMPACTION_SERVICE__COMPACTOR__DISABLED_COLLECTIONS",
"[\"00000000-0000-0000-0000-000000000001\"]",
);
scheduler.schedule().await;
let jobs = scheduler.get_jobs();
let jobs = jobs.collect::<Vec<&CompactionJob>>();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].collection_id, collection_uuid_2,);
std::env::set_var(
"CHROMA_COMPACTION_SERVICE__COMPACTOR__DISABLED_COLLECTIONS",
"[]",
);
// Even . should work.
std::env::set_var(
"CHROMA_COMPACTION_SERVICE.COMPACTOR.DISABLED_COLLECTIONS",
"[\"00000000-0000-0000-0000-000000000002\"]",
);
std::env::set_var(
"CHROMA_COMPACTION_SERVICE.IRRELEVANT",
"[\"00000000-0000-0000-0000-000000000001\"]",
);
scheduler.schedule().await;
let jobs = scheduler.get_jobs();
let jobs = jobs.collect::<Vec<&CompactionJob>>();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].collection_id, collection_uuid_1,);
std::env::set_var(
"CHROMA_COMPACTION_SERVICE.COMPACTOR.DISABLED_COLLECTIONS",
"[]",
);

// Test filter_collections
let member_1 = "1".to_string();
let member_2 = "5".to_string();
Expand Down Expand Up @@ -477,6 +565,7 @@ mod tests {
max_concurrent_jobs,
1,
assignment_policy,
HashSet::new(),
);

scheduler.set_memberlist(vec![my_ip.clone()]);
Expand Down
46 changes: 46 additions & 0 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ mod tests {
use super::*;
use figment::Jail;
use serial_test::serial;
use uuid::Uuid;

#[test]
#[serial]
Expand Down Expand Up @@ -261,6 +262,7 @@ mod tests {
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: ["74b3240e-a2b0-43d7-8adb-f55a394964a1", "496db4aa-fbe1-498a-b60b-81ec0fe59792"]
blockfile_provider:
Arrow:
block_manager_config:
Expand Down Expand Up @@ -289,6 +291,24 @@ mod tests {
"compaction-service-0"
);
assert_eq!(config.compaction_service.my_port, 50051);
assert_eq!(
config
.compaction_service
.compactor
.disabled_collections
.len(),
2
);
assert_eq!(
Uuid::parse_str(&config.compaction_service.compactor.disabled_collections[0])
.unwrap(),
Uuid::parse_str("74b3240e-a2b0-43d7-8adb-f55a394964a1").unwrap()
);
assert_eq!(
Uuid::parse_str(&config.compaction_service.compactor.disabled_collections[1])
.unwrap(),
Uuid::parse_str("496db4aa-fbe1-498a-b60b-81ec0fe59792").unwrap()
);
Ok(())
});
}
Expand Down Expand Up @@ -407,6 +427,7 @@ mod tests {
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: []
blockfile_provider:
Arrow:
block_manager_config:
Expand Down Expand Up @@ -571,6 +592,7 @@ mod tests {
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: []
blockfile_provider:
Arrow:
block_manager_config:
Expand Down Expand Up @@ -629,6 +651,10 @@ mod tests {
"CHROMA_COMPACTION_SERVICE__STORAGE__S3__REQUEST_TIMEOUT_MS",
1000,
);
jail.set_env(
"CHROMA_COMPACTION_SERVICE__COMPACTOR__DISABLED_COLLECTIONS",
"[\"74b3240e-a2b0-43d7-8adb-f55a394964a1\",\"496db4aa-fbe1-498a-b60b-81ec0fe59792\"]",
);
let _ = jail.create_file(
"chroma_config.yaml",
r#"
Expand Down Expand Up @@ -722,6 +748,7 @@ mod tests {
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: ["c92e4d75-eb25-4295-82d8-7c53dbd33258"]
blockfile_provider:
Arrow:
block_manager_config:
Expand Down Expand Up @@ -763,6 +790,24 @@ mod tests {
}
_ => panic!("Invalid storage config"),
}
assert_eq!(
config
.compaction_service
.compactor
.disabled_collections
.len(),
2
);
assert_eq!(
Uuid::parse_str(&config.compaction_service.compactor.disabled_collections[0])
.unwrap(),
Uuid::parse_str("74b3240e-a2b0-43d7-8adb-f55a394964a1").unwrap()
);
assert_eq!(
Uuid::parse_str(&config.compaction_service.compactor.disabled_collections[1])
.unwrap(),
Uuid::parse_str("496db4aa-fbe1-498a-b60b-81ec0fe59792").unwrap()
);
Ok(())
});
}
Expand Down Expand Up @@ -884,6 +929,7 @@ mod tests {
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: []
blockfile_provider:
Arrow:
block_manager_config:
Expand Down
Loading