Skip to content

[ENH] Gracefully shutdown GC system #4246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rust/garbage_collector/src/garbage_collector_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ impl Component for GarbageCollector {
|| Some(span!(parent: None, tracing::Level::INFO, "Scheduled garbage collection")),
);
}

fn on_stop_timeout(&self) -> Duration {
// NOTE: Increased timeout for remaining jobs to finish
Duration::from_secs(60)
}
Comment on lines +182 to +185
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: should we make this configurable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but ok doing that as a follow-up

}

#[derive(Debug)]
Expand Down
45 changes: 24 additions & 21 deletions rust/garbage_collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use chroma_tracing::{
use config::GarbageCollectorConfig;
use garbage_collector_component::GarbageCollector;
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::{sleep, Duration};
use tracing::{debug, error, info};

mod config;
Expand Down Expand Up @@ -56,7 +55,7 @@ pub async fn garbage_collector_service_entrypoint() -> Result<(), Box<dyn std::e
})?;

let system = System::new();
let dispatcher_handle = system.start_component(dispatcher);
let mut dispatcher_handle = system.start_component(dispatcher);

// Start a background task to periodically check for garbage.
// Garbage collector is a component that gets notified every
Expand All @@ -68,36 +67,40 @@ pub async fn garbage_collector_service_entrypoint() -> Result<(), Box<dyn std::e
e
})?;

garbage_collector_component.set_dispatcher(dispatcher_handle);
garbage_collector_component.set_dispatcher(dispatcher_handle.clone());
garbage_collector_component.set_system(system.clone());

let _ = system.start_component(garbage_collector_component);
let mut garbage_collector_handle = system.start_component(garbage_collector_component);

// Keep the service running and handle shutdown signals
let mut sigterm = signal(SignalKind::terminate())?;
let mut sigint = signal(SignalKind::interrupt())?;

info!("Service running, waiting for signals");
loop {
tokio::select! {
_ = sigterm.recv() => {
info!("Received SIGTERM signal");
break;
}
_ = sigint.recv() => {
info!("Received SIGINT signal");
break;
}
_ = sleep(Duration::from_secs(1)) => {
// Keep the service running
continue;
}
tokio::select! {
_ = sigterm.recv() => {
info!("Received SIGTERM signal");
}
_ = sigint.recv() => {
info!("Received SIGINT signal");
}
}

// Give some time for any in-progress garbage collection to complete
info!("Starting graceful shutdown, waiting for in-progress tasks");
sleep(Duration::from_secs(5)).await;
// NOTE: We should first stop the garbage collector. The garbage collector will finish the remaining jobs before shutdown.
// We cannot directly shutdown the dispatcher and system because that will fail remaining jobs.
garbage_collector_handle.stop();
garbage_collector_handle
.join()
.await
.expect("Garbage collector should be stoppable");
dispatcher_handle.stop();
dispatcher_handle
.join()
.await
.expect("Dispatcher should be stoppable");
system.stop().await;
system.join().await;

info!("Shutting down garbage collector service");
Ok(())
}
Loading