Skip to content

Commit 216fc54

Browse files
Tanuj NayakTanuj Nayak
authored andcommitted
added metrics to block manager
1 parent 84a18de commit 216fc54

File tree

7 files changed

+110
-27
lines changed

7 files changed

+110
-27
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/blockstore/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ tokio = { workspace = true }
2323
num_cpus = { workspace = true }
2424
flatbuffers = { workspace = true }
2525
itertools = { workspace = true }
26+
opentelemetry = { workspace = true }
2627

2728
chroma-error = { workspace = true }
2829
chroma-config = { workspace = true }
2930
chroma-storage = { workspace = true }
3031
chroma-cache = { workspace = true }
3132
chroma-types = { workspace = true }
33+
chroma-tracing = { workspace = true }
3234

3335
[dev-dependencies]
3436
criterion = { workspace = true }

rust/blockstore/src/arrow/flusher.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ use super::{
44
root::RootWriter,
55
types::{ArrowWriteableKey, ArrowWriteableValue},
66
};
7-
use chroma_error::ChromaError;
87
use futures::{StreamExt, TryStreamExt};
98
use uuid::Uuid;
109

10+
use chroma_error::ChromaError;
11+
1112
pub struct ArrowBlockfileFlusher {
1213
block_manager: BlockManager,
1314
root_manager: RootManager,

rust/blockstore/src/arrow/provider.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use chroma_error::{ChromaError, ErrorCodes};
2020
use chroma_storage::{
2121
admissioncontrolleds3::StorageRequestPriority, GetOptions, PutOptions, Storage,
2222
};
23+
use chroma_tracing::util::{get_current_trace_id, Stopwatch};
2324
use futures::{stream::FuturesUnordered, StreamExt};
25+
use opentelemetry::{global, KeyValue};
2426
use std::{
2527
collections::HashMap,
2628
sync::Arc,
@@ -263,6 +265,36 @@ impl ChromaError for ForkError {
263265
}
264266
}
265267

268+
#[derive(Clone)]
269+
pub struct BlockMetrics {
270+
pub commit_latency: opentelemetry::metrics::Histogram<u64>,
271+
pub num_blocks_flushed: opentelemetry::metrics::Histogram<u64>,
272+
pub flush_latency: opentelemetry::metrics::Histogram<u64>,
273+
}
274+
275+
impl Default for BlockMetrics {
276+
fn default() -> Self {
277+
let meter = global::meter("chroma");
278+
Self {
279+
commit_latency: meter
280+
.u64_histogram("block_commit_latency")
281+
.with_description("Commit latency")
282+
.with_unit("microseconds")
283+
.build(),
284+
num_blocks_flushed: meter
285+
.u64_histogram("block_num_blocks_flushed")
286+
.with_description("Number of blocks flushed")
287+
.with_unit("blocks")
288+
.build(),
289+
flush_latency: meter
290+
.u64_histogram("block_flush_latency")
291+
.with_description("Flush latency")
292+
.with_unit("microseconds")
293+
.build(),
294+
}
295+
}
296+
}
297+
266298
/// A simple local cache of Arrow-backed blocks, the blockfile provider passes this
267299
/// to the ArrowBlockfile when it creates a new blockfile. So that the blockfile can manage and access blocks
268300
/// # Note
@@ -274,6 +306,7 @@ pub(super) struct BlockManager {
274306
block_cache: Arc<dyn PersistentCache<Uuid, Block>>,
275307
storage: Storage,
276308
max_block_size_bytes: usize,
309+
block_metrics: BlockMetrics,
277310
}
278311

279312
impl BlockManager {
@@ -287,6 +320,7 @@ impl BlockManager {
287320
block_cache,
288321
storage,
289322
max_block_size_bytes,
323+
block_metrics: BlockMetrics::default(),
290324
}
291325
}
292326

@@ -317,6 +351,9 @@ impl BlockManager {
317351
&self,
318352
delta: impl Delta,
319353
) -> Block {
354+
let trace_id = get_current_trace_id().to_string();
355+
let attribute = [KeyValue::new("trace_id", trace_id)];
356+
let _stopwatch = Stopwatch::new(&self.block_metrics.commit_latency, &attribute);
320357
let delta_id = delta.id();
321358
let record_batch = delta.finish::<K, V>(None);
322359
let block = Block::from_record_batch(delta_id, record_batch);
@@ -387,6 +424,10 @@ impl BlockManager {
387424
}
388425
};
389426
let key = format!("block/{}", block.id);
427+
428+
let trace_id = get_current_trace_id().to_string();
429+
let attribute = [KeyValue::new("trace_id", trace_id)];
430+
let _stopwatch = Stopwatch::new(&self.block_metrics.flush_latency, &attribute);
390431
let block_bytes_len = bytes.len();
391432
let res = self
392433
.storage
@@ -403,6 +444,7 @@ impl BlockManager {
403444
block.id,
404445
block_bytes_len
405446
);
447+
self.block_metrics.num_blocks_flushed.record(1, &attribute);
406448
}
407449
Err(e) => {
408450
tracing::info!("Error writing block to storage {}", e);

rust/frontend/src/tower_tracing.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
use axum::extract::MatchedPath;
22
use axum::http::{header, Request, Response};
33
use axum::Router;
4+
use chroma_tracing::util::get_current_trace_id;
45
use futures::future::BoxFuture;
5-
use opentelemetry::trace::TraceContextExt;
66
use std::time::Duration;
77
use tower::Service;
88
use tower_http::trace::{MakeSpan, OnResponse, TraceLayer};
99

10-
use tracing_opentelemetry::OpenTelemetrySpanExt;
11-
1210
#[derive(Clone)]
1311
struct RequestTracing;
1412
impl<B> MakeSpan<B> for RequestTracing {
@@ -81,12 +79,7 @@ where
8179
Box::pin(async move {
8280
let mut response: Response<Rs> = future.await?;
8381
if response.status().is_client_error() || response.status().is_server_error() {
84-
let trace_id = tracing::Span::current()
85-
.context()
86-
.span()
87-
.span_context()
88-
.trace_id()
89-
.to_string();
82+
let trace_id = get_current_trace_id().to_string();
9083
let headers = response.headers_mut();
9184
let header_val = trace_id.parse::<header::HeaderValue>();
9285
if let Ok(val) = header_val {

rust/storage/src/s3.rs

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ use bytes::Bytes;
2929
use chroma_config::registry::Registry;
3030
use chroma_config::Configurable;
3131
use chroma_error::ChromaError;
32+
use chroma_tracing::util::get_current_trace_id;
3233
use futures::future::BoxFuture;
3334
use futures::stream;
3435
use futures::FutureExt;
3536
use futures::Stream;
3637
use futures::StreamExt;
3738
use opentelemetry::metrics::Meter;
39+
use opentelemetry::KeyValue;
3840
use rand::Rng;
3941
use std::clone::Clone;
4042
use std::ops::Range;
@@ -45,15 +47,27 @@ use tracing::Instrument;
4547

4648
#[derive(Clone, Debug)]
4749
pub struct S3StorageMetrics {
48-
total_num_get_requests: opentelemetry::metrics::Counter<u64>,
50+
num_get_requests: opentelemetry::metrics::Histogram<u64>,
51+
num_put_requests: opentelemetry::metrics::Histogram<u64>,
52+
num_delete_requests: opentelemetry::metrics::Histogram<u64>,
4953
}
5054

5155
impl S3StorageMetrics {
5256
pub fn new(meter: Meter) -> Self {
5357
Self {
54-
total_num_get_requests: meter
55-
.u64_counter("s3_storage_total_num_get_requests")
56-
.with_description("The number of GET requests to S3. This does not include metadata HEAD requests.")
58+
num_get_requests: meter
59+
.u64_histogram("s3_num_get_requests")
60+
.with_description("The number of GET requests to S3.")
61+
.with_unit("requests")
62+
.build(),
63+
num_put_requests: meter
64+
.u64_histogram("s3_num_put_requests")
65+
.with_description("The number of PUT requests to S3.")
66+
.with_unit("requests")
67+
.build(),
68+
num_delete_requests: meter
69+
.u64_histogram("s3_num_delete_requests")
70+
.with_description("The number of DELETE requests to S3.")
5771
.with_unit("requests")
5872
.build(),
5973
}
@@ -144,7 +158,10 @@ impl S3Storage {
144158
.send()
145159
.instrument(tracing::trace_span!("cold S3 get"))
146160
.await;
147-
self.metrics.total_num_get_requests.add(1, &[]);
161+
162+
let trace_id = get_current_trace_id().to_string();
163+
let attribute = [KeyValue::new("trace_id", trace_id)];
164+
self.metrics.num_get_requests.record(1, &attribute);
148165
match res {
149166
Ok(res) => {
150167
let byte_stream = res.body;
@@ -200,6 +217,9 @@ impl S3Storage {
200217
.key(key)
201218
.send()
202219
.await;
220+
let trace_id = get_current_trace_id().to_string();
221+
let attribute = [KeyValue::new("trace_id", trace_id)];
222+
self.metrics.num_get_requests.record(1, &attribute);
203223
let (content_length, e_tag) = match head_res {
204224
Ok(res) => match res.content_length {
205225
Some(len) => (len, res.e_tag),
@@ -245,7 +265,9 @@ impl S3Storage {
245265
.send()
246266
.instrument(tracing::trace_span!("cold S3 get"))
247267
.await;
248-
self.metrics.total_num_get_requests.add(1, &[]);
268+
let trace_id = get_current_trace_id().to_string();
269+
let attribute = [KeyValue::new("trace_id", trace_id)];
270+
self.metrics.num_get_requests.record(1, &attribute);
249271
match res {
250272
Ok(output) => Ok(output),
251273
Err(e) => {
@@ -342,7 +364,6 @@ impl S3Storage {
342364
key: &str,
343365
) -> Result<(Arc<Vec<u8>>, Option<ETag>), StorageError> {
344366
let (mut stream, e_tag) = self.get_stream_and_e_tag(key).await?;
345-
let read_block_span = tracing::trace_span!("S3 read bytes to end");
346367
let buf = async {
347368
let mut buf: Vec<u8> = Vec::new();
348369
while let Some(res) = stream.next().await {
@@ -358,7 +379,6 @@ impl S3Storage {
358379
}
359380
Ok(Some(buf))
360381
}
361-
.instrument(read_block_span)
362382
.await?;
363383
match buf {
364384
Some(buf) => Ok((Arc::new(buf), e_tag)),
@@ -444,13 +464,19 @@ impl S3Storage {
444464
) -> BoxFuture<'static, Result<ByteStream, StorageError>>,
445465
options: PutOptions,
446466
) -> Result<Option<ETag>, StorageError> {
447-
if self.is_oneshot_upload(total_size_bytes) {
448-
return self
449-
.oneshot_upload(key, total_size_bytes, create_bytestream_fn, options)
450-
.await;
451-
}
452-
self.multipart_upload(key, total_size_bytes, create_bytestream_fn, options)
453-
.await
467+
let res = {
468+
if self.is_oneshot_upload(total_size_bytes) {
469+
self.oneshot_upload(key, total_size_bytes, create_bytestream_fn, options)
470+
.await
471+
} else {
472+
self.multipart_upload(key, total_size_bytes, create_bytestream_fn, options)
473+
.await
474+
}
475+
};
476+
let trace_id = get_current_trace_id().to_string();
477+
let attribute = [KeyValue::new("trace_id", trace_id)];
478+
self.metrics.num_put_requests.record(1, &attribute);
479+
res
454480
}
455481

456482
#[tracing::instrument(skip(self, create_bytestream_fn), level = "trace")]
@@ -492,6 +518,10 @@ impl S3Storage {
492518
}
493519
}
494520
})?;
521+
522+
let trace_id = get_current_trace_id().to_string();
523+
let attribute = [KeyValue::new("trace_id", trace_id)];
524+
self.metrics.num_put_requests.record(1, &attribute);
495525
Ok(resp.e_tag.map(ETag))
496526
}
497527

@@ -568,6 +598,9 @@ impl S3Storage {
568598
source: Arc::new(err.into_service_error()),
569599
})?;
570600

601+
let trace_id = get_current_trace_id().to_string();
602+
let attribute = [KeyValue::new("trace_id", trace_id)];
603+
self.metrics.num_put_requests.record(1, &attribute);
571604
Ok(CompletedPart::builder()
572605
.e_tag(upload_part_res.e_tag.unwrap_or_default())
573606
.part_number(part_number)
@@ -656,7 +689,7 @@ impl S3Storage {
656689
None => req,
657690
};
658691

659-
match req.send().await {
692+
let res = match req.send().await {
660693
Ok(_) => {
661694
tracing::trace!(key = %key, "Successfully deleted object from S3");
662695
Ok(())
@@ -667,7 +700,12 @@ impl S3Storage {
667700
source: Arc::new(e.into_service_error()),
668701
})
669702
}
670-
}
703+
};
704+
705+
let trace_id = get_current_trace_id().to_string();
706+
let attribute = [KeyValue::new("trace_id", trace_id)];
707+
self.metrics.num_delete_requests.record(1, &attribute);
708+
res
671709
}
672710

673711
#[tracing::instrument(skip(self), level = "trace")]

rust/tracing/src/util.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,8 @@ impl Drop for Stopwatch<'_> {
9090
self.0.record(elapsed, self.1);
9191
}
9292
}
93+
94+
pub fn get_current_trace_id() -> TraceId {
95+
let span = tracing::Span::current();
96+
span.context().span().span_context().trace_id()
97+
}

0 commit comments

Comments
 (0)