Skip to content

Commit 6203deb

Browse files
authored
[ENH] Add orchestration for count (#2180)
## Description of changes *Summarize the changes made by this PR.* - New functionality Introduces the orchestrator for count and a basic counting logic where we simply add the number of log records with the number of records in the blockfile. More advanced merging like deduping adds and updates and removing deletes will be a different PR. ## Test plan - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None --------- Co-authored-by: Sanket Kedia <[email protected]>
1 parent 4075243 commit 6203deb

File tree

6 files changed

+490
-10
lines changed

6 files changed

+490
-10
lines changed

rust/worker/src/blockstore/arrow/blockfile.rs

+10-4
Original file line numberDiff line numberDiff line change
@@ -277,11 +277,17 @@ impl<'me, K: ArrowReadableKey<'me>, V: ArrowReadableValue<'me>> ArrowBlockfileRe
277277

278278
// Count the total number of records.
279279
pub(crate) async fn count(&self) -> Result<usize, Box<dyn ChromaError>> {
280+
let mut block_ids: Vec<Uuid> = vec![];
281+
{
282+
let lock_guard = self.sparse_index.forward.lock();
283+
let mut curr_iter = lock_guard.iter();
284+
while let Some((_, block_id)) = curr_iter.next() {
285+
block_ids.push(block_id.clone());
286+
}
287+
}
280288
let mut result: usize = 0;
281-
let lock_guard = self.sparse_index.forward.lock();
282-
let mut curr_iter = lock_guard.iter();
283-
while let Some((_, block_id)) = curr_iter.next() {
284-
let block = self.get_block(*block_id).await;
289+
for block_id in block_ids {
290+
let block = self.get_block(block_id).await;
285291
match block {
286292
Some(b) => result = result + b.len(),
287293
None => {

rust/worker/src/blockstore/types.rs

+14-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub(crate) enum BlockfileError {
2727
TransactionInProgress,
2828
#[error("Transaction not in progress")]
2929
TransactionNotInProgress,
30+
#[error("Block not found")]
31+
BlockNotFound,
3032
}
3133

3234
impl ChromaError for BlockfileError {
@@ -38,6 +40,7 @@ impl ChromaError for BlockfileError {
3840
BlockfileError::TransactionInProgress | BlockfileError::TransactionNotInProgress => {
3941
ErrorCodes::FailedPrecondition
4042
}
43+
BlockfileError::BlockNotFound => ErrorCodes::Internal,
4144
}
4245
}
4346
}
@@ -297,7 +300,17 @@ impl<
297300
pub(crate) async fn count(&'referred_data self) -> Result<usize, Box<dyn ChromaError>> {
298301
match self {
299302
BlockfileReader::MemoryBlockfileReader(reader) => reader.count(),
300-
BlockfileReader::ArrowBlockfileReader(reader) => reader.count().await,
303+
BlockfileReader::ArrowBlockfileReader(reader) => {
304+
let count = reader.count().await;
305+
match count {
306+
Ok(c) => {
307+
return Ok(c);
308+
}
309+
Err(_) => {
310+
return Err(Box::new(BlockfileError::BlockNotFound));
311+
}
312+
}
313+
}
301314
}
302315
}
303316

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
use thiserror::Error;
2+
3+
use tonic::async_trait;
4+
5+
use crate::{
6+
blockstore::provider::BlockfileProvider,
7+
errors::{ChromaError, ErrorCodes},
8+
execution::operator::Operator,
9+
segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
10+
types::Segment,
11+
};
12+
13+
#[derive(Debug)]
14+
pub(crate) struct CountRecordsOperator {}
15+
16+
impl CountRecordsOperator {
17+
pub(crate) fn new() -> Box<Self> {
18+
Box::new(CountRecordsOperator {})
19+
}
20+
}
21+
22+
#[derive(Debug)]
23+
pub(crate) struct CountRecordsInput {
24+
record_segment_definition: Segment,
25+
blockfile_provider: BlockfileProvider,
26+
}
27+
28+
impl CountRecordsInput {
29+
pub(crate) fn new(
30+
record_segment_definition: Segment,
31+
blockfile_provider: BlockfileProvider,
32+
) -> Self {
33+
Self {
34+
record_segment_definition,
35+
blockfile_provider,
36+
}
37+
}
38+
}
39+
40+
#[derive(Debug)]
41+
pub(crate) struct CountRecordsOutput {
42+
pub(crate) count: usize,
43+
}
44+
45+
#[derive(Error, Debug)]
46+
pub(crate) enum CountRecordsError {
47+
#[error("Error reading record segment reader")]
48+
RecordSegmentReadError,
49+
#[error("Error creating record segment reader")]
50+
RecordSegmentError(#[from] RecordSegmentReaderCreationError),
51+
}
52+
53+
impl ChromaError for CountRecordsError {
54+
fn code(&self) -> ErrorCodes {
55+
match self {
56+
CountRecordsError::RecordSegmentError(_) => ErrorCodes::Internal,
57+
CountRecordsError::RecordSegmentReadError => ErrorCodes::Internal,
58+
}
59+
}
60+
}
61+
62+
#[async_trait]
63+
impl Operator<CountRecordsInput, CountRecordsOutput> for CountRecordsOperator {
64+
type Error = CountRecordsError;
65+
async fn run(
66+
&self,
67+
input: &CountRecordsInput,
68+
) -> Result<CountRecordsOutput, CountRecordsError> {
69+
let segment_reader = RecordSegmentReader::from_segment(
70+
&input.record_segment_definition,
71+
&input.blockfile_provider,
72+
)
73+
.await;
74+
match segment_reader {
75+
Ok(reader) => match reader.count().await {
76+
Ok(val) => {
77+
return Ok(CountRecordsOutput { count: val });
78+
}
79+
Err(_) => {
80+
println!("Error reading record segment");
81+
return Err(CountRecordsError::RecordSegmentReadError);
82+
}
83+
},
84+
Err(e) => {
85+
println!("Error opening record segment");
86+
return Err(CountRecordsError::RecordSegmentError(*e));
87+
}
88+
}
89+
}
90+
}

rust/worker/src/execution/operators/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub(super) mod brute_force_knn;
2+
pub(super) mod count_records;
23
pub(super) mod flush_s3;
34
pub(super) mod hnsw_knn;
45
pub(super) mod merge_knn_results;

0 commit comments

Comments
 (0)