Skip to content

[BUG] Segmentation fault in get_block #4336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion rust/blockstore/src/arrow/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,14 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
return Err(e);
}
};
self.loaded_blocks.write().insert(block_id, Box::new(block));
// Don't reinsert if someone else has already inserted it.
// All existing references to the block would become invalid
// causing a NPE.
let mut write_guard = self.loaded_blocks.write();
if let Some(block) = write_guard.get(&block_id) {
return Ok(Some(unsafe { transmute::<&Block, &Block>(&**block) }));
}
write_guard.insert(block_id, Box::new(block));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally block_manager should return a heap-allocated type, that would kill the rc

}

if let Some(block) = self.loaded_blocks.read().get(&block_id) {
Expand Down
63 changes: 63 additions & 0 deletions rust/blockstore/src/arrow/concurrency_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,67 @@ mod tests {
}
});
}

#[test]
fn test_concurrent_readers() {
let mut config = shuttle::Config::default();
config.stack_size = 1024 * 1024; // 1MB

let scheduler = RandomScheduler::new(100);
let runner = Runner::new(scheduler, config);

runner.run(|| {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let block_cache = new_cache_for_test();
let sparse_index_cache = new_cache_for_test();
let blockfile_provider = ArrowBlockfileProvider::new(
storage,
TEST_MAX_BLOCK_SIZE_BYTES,
block_cache,
sparse_index_cache,
);
let reader = future::block_on(async {
let writer = blockfile_provider
.write::<&str, u32>(BlockfileWriterOptions::default())
.await
.expect("Failed to create writer");
let id = writer.id();
writer
.set::<&str, u32>("", "key1", 1)
.await
.expect("Failed to set key");
let flusher = writer.commit::<&str, Vec<u32>>().await.unwrap();
flusher.flush::<&str, u32>().await.unwrap();

// Clear cache.
blockfile_provider.clear().await.expect("Clear bf provider");

blockfile_provider.read::<&str, u32>(&id).await.unwrap()
});
// Make the max threads the number of cores * 2
let max_threads = num_cpus::get() * 2;
let t = shuttle::rand::thread_rng().gen_range(2..max_threads);
let mut join_handles = Vec::with_capacity(t);
for _ in 0..t {
let reader_clone = reader.clone();
let handle = thread::spawn(move || {
future::block_on(async {
reader_clone
.get("", "key1")
.await
.expect("Expected value")
.expect("Expected value")
})
});
join_handles.push(handle);
}

// No errors.
for handle in join_handles {
let val = handle.join().unwrap();
assert_eq!(val, 1);
}
});
}
}
Loading