Skip to content

Commit 38fb531

Browse files
authored
[BUG] Lock BF writer (#2189)
## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Locks the BF writer for write operations since we need to make these properly thread safe - New functionality - None ## Test plan *How are these changes tested?* - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None
1 parent 7092001 commit 38fb531

File tree

1 file changed

+7
-0
lines changed

1 file changed

+7
-0
lines changed

rust/worker/src/blockstore/arrow/blockfile.rs

+7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub(crate) struct ArrowBlockfileWriter {
2424
block_deltas: Arc<Mutex<HashMap<Uuid, BlockDelta>>>,
2525
sparse_index: SparseIndex,
2626
id: Uuid,
27+
write_mutex: Arc<tokio::sync::Mutex<()>>,
2728
}
2829
// TODO: method visibility should not be pub(crate)
2930

@@ -62,6 +63,7 @@ impl ArrowBlockfileWriter {
6263
block_deltas: block_deltas,
6364
sparse_index: sparse_index,
6465
id,
66+
write_mutex: Arc::new(tokio::sync::Mutex::new(())),
6567
}
6668
}
6769

@@ -78,6 +80,7 @@ impl ArrowBlockfileWriter {
7880
block_deltas: block_deltas,
7981
sparse_index: new_sparse_index,
8082
id,
83+
write_mutex: Arc::new(tokio::sync::Mutex::new(())),
8184
}
8285
}
8386

@@ -110,6 +113,9 @@ impl ArrowBlockfileWriter {
110113
key: K,
111114
value: V,
112115
) -> Result<(), Box<dyn ChromaError>> {
116+
// TODO: for now the BF writer locks the entire write operation
117+
let _guard = self.write_mutex.lock().await;
118+
113119
// TODO: value must be smaller than the block size except for position lists, which are a special case
114120
// // where we split the value across multiple blocks
115121
// if !self.in_transaction() {
@@ -173,6 +179,7 @@ impl ArrowBlockfileWriter {
173179
prefix: &str,
174180
key: K,
175181
) -> Result<(), Box<dyn ChromaError>> {
182+
let _guard = self.write_mutex.lock().await;
176183
// Get the target block id for the key
177184
let search_key = CompositeKey::new(prefix.to_string(), key.clone());
178185
let target_block_id = self.sparse_index.get_target_block_id(&search_key);

0 commit comments

Comments
 (0)