Skip to content

Commit b842838

Browse files
Tanuj NayakTanuj Nayak
authored andcommitted
make it just block level metrics
1 parent 216fc54 commit b842838

File tree

2 files changed

+9
-55
lines changed

2 files changed

+9
-55
lines changed

rust/blockstore/src/arrow/provider.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ pub struct BlockMetrics {
270270
pub commit_latency: opentelemetry::metrics::Histogram<u64>,
271271
pub num_blocks_flushed: opentelemetry::metrics::Histogram<u64>,
272272
pub flush_latency: opentelemetry::metrics::Histogram<u64>,
273+
pub num_get_requests: opentelemetry::metrics::Histogram<u64>,
273274
}
274275

275276
impl Default for BlockMetrics {
@@ -291,6 +292,11 @@ impl Default for BlockMetrics {
291292
.with_description("Flush latency")
292293
.with_unit("microseconds")
293294
.build(),
295+
num_get_requests: meter
296+
.u64_histogram("block_num_cold_get_requests")
297+
.with_description("Number of cold block get requests")
298+
.with_unit("requests")
299+
.build(),
294300
}
295301
}
296302
}
@@ -388,6 +394,9 @@ impl BlockManager {
388394
.await;
389395
match bytes_res {
390396
Ok(bytes) => {
397+
let trace_id = get_current_trace_id().to_string();
398+
let attribute = [KeyValue::new("trace_id", trace_id)];
399+
self.block_metrics.num_get_requests.record(1, &attribute);
391400
let deserialization_span = tracing::trace_span!(parent: Span::current(), "BlockManager deserialize block");
392401
let block =
393402
deserialization_span.in_scope(|| Block::from_bytes(&bytes, *id));

rust/storage/src/s3.rs

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,11 @@ use bytes::Bytes;
2929
use chroma_config::registry::Registry;
3030
use chroma_config::Configurable;
3131
use chroma_error::ChromaError;
32-
use chroma_tracing::util::get_current_trace_id;
3332
use futures::future::BoxFuture;
3433
use futures::stream;
3534
use futures::FutureExt;
3635
use futures::Stream;
3736
use futures::StreamExt;
38-
use opentelemetry::metrics::Meter;
39-
use opentelemetry::KeyValue;
4037
use rand::Rng;
4138
use std::clone::Clone;
4239
use std::ops::Range;
@@ -45,42 +42,12 @@ use std::time::Duration;
4542
use tokio::io::AsyncReadExt;
4643
use tracing::Instrument;
4744

48-
#[derive(Clone, Debug)]
49-
pub struct S3StorageMetrics {
50-
num_get_requests: opentelemetry::metrics::Histogram<u64>,
51-
num_put_requests: opentelemetry::metrics::Histogram<u64>,
52-
num_delete_requests: opentelemetry::metrics::Histogram<u64>,
53-
}
54-
55-
impl S3StorageMetrics {
56-
pub fn new(meter: Meter) -> Self {
57-
Self {
58-
num_get_requests: meter
59-
.u64_histogram("s3_num_get_requests")
60-
.with_description("The number of GET requests to S3.")
61-
.with_unit("requests")
62-
.build(),
63-
num_put_requests: meter
64-
.u64_histogram("s3_num_put_requests")
65-
.with_description("The number of PUT requests to S3.")
66-
.with_unit("requests")
67-
.build(),
68-
num_delete_requests: meter
69-
.u64_histogram("s3_num_delete_requests")
70-
.with_description("The number of DELETE requests to S3.")
71-
.with_unit("requests")
72-
.build(),
73-
}
74-
}
75-
}
76-
7745
#[derive(Clone)]
7846
pub struct S3Storage {
7947
pub(super) bucket: String,
8048
pub(super) client: aws_sdk_s3::Client,
8149
pub(super) upload_part_size_bytes: usize,
8250
pub(super) download_part_size_bytes: usize,
83-
pub(super) metrics: S3StorageMetrics,
8451
}
8552

8653
impl S3Storage {
@@ -96,7 +63,6 @@ impl S3Storage {
9663
client,
9764
upload_part_size_bytes,
9865
download_part_size_bytes,
99-
metrics: S3StorageMetrics::new(opentelemetry::global::meter("chroma")),
10066
}
10167
}
10268

@@ -159,9 +125,6 @@ impl S3Storage {
159125
.instrument(tracing::trace_span!("cold S3 get"))
160126
.await;
161127

162-
let trace_id = get_current_trace_id().to_string();
163-
let attribute = [KeyValue::new("trace_id", trace_id)];
164-
self.metrics.num_get_requests.record(1, &attribute);
165128
match res {
166129
Ok(res) => {
167130
let byte_stream = res.body;
@@ -217,9 +180,6 @@ impl S3Storage {
217180
.key(key)
218181
.send()
219182
.await;
220-
let trace_id = get_current_trace_id().to_string();
221-
let attribute = [KeyValue::new("trace_id", trace_id)];
222-
self.metrics.num_get_requests.record(1, &attribute);
223183
let (content_length, e_tag) = match head_res {
224184
Ok(res) => match res.content_length {
225185
Some(len) => (len, res.e_tag),
@@ -265,9 +225,6 @@ impl S3Storage {
265225
.send()
266226
.instrument(tracing::trace_span!("cold S3 get"))
267227
.await;
268-
let trace_id = get_current_trace_id().to_string();
269-
let attribute = [KeyValue::new("trace_id", trace_id)];
270-
self.metrics.num_get_requests.record(1, &attribute);
271228
match res {
272229
Ok(output) => Ok(output),
273230
Err(e) => {
@@ -473,9 +430,6 @@ impl S3Storage {
473430
.await
474431
}
475432
};
476-
let trace_id = get_current_trace_id().to_string();
477-
let attribute = [KeyValue::new("trace_id", trace_id)];
478-
self.metrics.num_put_requests.record(1, &attribute);
479433
res
480434
}
481435

@@ -519,9 +473,6 @@ impl S3Storage {
519473
}
520474
})?;
521475

522-
let trace_id = get_current_trace_id().to_string();
523-
let attribute = [KeyValue::new("trace_id", trace_id)];
524-
self.metrics.num_put_requests.record(1, &attribute);
525476
Ok(resp.e_tag.map(ETag))
526477
}
527478

@@ -598,9 +549,6 @@ impl S3Storage {
598549
source: Arc::new(err.into_service_error()),
599550
})?;
600551

601-
let trace_id = get_current_trace_id().to_string();
602-
let attribute = [KeyValue::new("trace_id", trace_id)];
603-
self.metrics.num_put_requests.record(1, &attribute);
604552
Ok(CompletedPart::builder()
605553
.e_tag(upload_part_res.e_tag.unwrap_or_default())
606554
.part_number(part_number)
@@ -702,9 +650,6 @@ impl S3Storage {
702650
}
703651
};
704652

705-
let trace_id = get_current_trace_id().to_string();
706-
let attribute = [KeyValue::new("trace_id", trace_id)];
707-
self.metrics.num_delete_requests.record(1, &attribute);
708653
res
709654
}
710655

0 commit comments

Comments
 (0)