@@ -16,6 +16,7 @@ use chroma_system::{
16
16
use chroma_types:: CollectionUuid ;
17
17
use chrono:: { DateTime , Utc } ;
18
18
use futures:: { stream:: FuturesUnordered , StreamExt } ;
19
+ use opentelemetry:: metrics:: { Counter , Histogram } ;
19
20
use std:: {
20
21
collections:: { HashMap , HashSet } ,
21
22
fmt:: { Debug , Formatter } ,
@@ -38,6 +39,10 @@ pub(crate) struct GarbageCollector {
38
39
system : Option < chroma_system:: System > ,
39
40
default_cleanup_mode : CleanupMode ,
40
41
tenant_mode_overrides : Option < HashMap < String , CleanupMode > > ,
42
+ total_jobs_metric : Counter < u64 > ,
43
+ job_duration_ms_metric : Histogram < u64 > ,
44
+ total_files_deleted_metric : Counter < u64 > ,
45
+ total_versions_deleted_metric : Counter < u64 > ,
41
46
}
42
47
43
48
impl Debug for GarbageCollector {
@@ -66,6 +71,8 @@ impl GarbageCollector {
66
71
default_cleanup_mode : CleanupMode ,
67
72
tenant_mode_overrides : Option < HashMap < String , CleanupMode > > ,
68
73
) -> Self {
74
+ let meter = opentelemetry:: global:: meter ( "chroma" ) ;
75
+
69
76
Self {
70
77
gc_interval_mins,
71
78
relative_cutoff_time,
@@ -77,6 +84,23 @@ impl GarbageCollector {
77
84
system : None ,
78
85
default_cleanup_mode,
79
86
tenant_mode_overrides,
87
+ total_jobs_metric : meter
88
+ . u64_counter ( "garbage_collector.total_jobs" )
89
+ . with_description ( "Total number of garbage collection jobs executed" )
90
+ . build ( ) ,
91
+ job_duration_ms_metric : meter
92
+ . u64_histogram ( "garbage_collector.job_duration_ms" )
93
+ . with_description ( "Duration of garbage collection jobs in milliseconds" )
94
+ . with_unit ( "ms" )
95
+ . build ( ) ,
96
+ total_files_deleted_metric : meter
97
+ . u64_counter ( "garbage_collector.total_files_deleted" )
98
+ . with_description ( "Total number of files deleted during garbage collection" )
99
+ . build ( ) ,
100
+ total_versions_deleted_metric : meter
101
+ . u64_counter ( "garbage_collector.total_versions_deleted" )
102
+ . with_description ( "Total number of versions deleted during garbage collection" )
103
+ . build ( ) ,
80
104
}
81
105
}
82
106
@@ -107,7 +131,28 @@ impl GarbageCollector {
107
131
) ;
108
132
109
133
if let Some ( system) = self . system . as_ref ( ) {
110
- return Ok ( orchestrator. run ( system. clone ( ) ) . await ?) ;
134
+ let started_at = SystemTime :: now ( ) ;
135
+ let result = orchestrator. run ( system. clone ( ) ) . await ?;
136
+ let duration_ms = started_at
137
+ . elapsed ( )
138
+ . map ( |d| d. as_millis ( ) as u64 )
139
+ . unwrap_or ( 0 ) ;
140
+ self . job_duration_ms_metric . record ( duration_ms, & [ ] ) ;
141
+ self . total_files_deleted_metric . add (
142
+ result. deletion_list . len ( ) as u64 ,
143
+ & [ opentelemetry:: KeyValue :: new (
144
+ "cleanup_mode" ,
145
+ format ! ( "{:?}" , cleanup_mode) ,
146
+ ) ] ,
147
+ ) ;
148
+ self . total_versions_deleted_metric . add (
149
+ result. num_versions_deleted as u64 ,
150
+ & [ opentelemetry:: KeyValue :: new (
151
+ "cleanup_mode" ,
152
+ format ! ( "{:?}" , cleanup_mode) ,
153
+ ) ] ,
154
+ ) ;
155
+ return Ok ( result) ;
111
156
}
112
157
}
113
158
@@ -221,6 +266,15 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
221
266
num_failed_jobs
222
267
) ;
223
268
269
+ self . total_jobs_metric . add (
270
+ num_completed_jobs as u64 ,
271
+ & [ opentelemetry:: KeyValue :: new ( "status" , "success" ) ] ,
272
+ ) ;
273
+ self . total_jobs_metric . add (
274
+ num_failed_jobs as u64 ,
275
+ & [ opentelemetry:: KeyValue :: new ( "status" , "failure" ) ] ,
276
+ ) ;
277
+
224
278
// Schedule next run
225
279
ctx. scheduler . schedule (
226
280
GarbageCollectMessage { } ,
0 commit comments