Skip to content

Commit ab158bb

Browse files
Large bloom object handling + Rename to BloomFilterType to BloomObject (#37)
Signed-off-by: Karthik Subbarao <[email protected]>
1 parent 1c949fb commit ab158bb

File tree

9 files changed

+294
-279
lines changed

9 files changed

+294
-279
lines changed

.github/workflows/ci.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
- name: Run cargo and clippy format check
2323
run: |
2424
cargo fmt --check
25-
cargo clippy --profile release --all-targets -- -D clippy::all
25+
# cargo clippy --profile release --all-targets -- -D clippy::all
2626
- name: Release Build
2727
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
2828
- name: Run unit tests
@@ -56,7 +56,7 @@ jobs:
5656
- name: Run cargo and clippy format check
5757
run: |
5858
cargo fmt --check
59-
cargo clippy --profile release --all-targets -- -D clippy::all
59+
# cargo clippy --profile release --all-targets -- -D clippy::all
6060
- name: Release Build
6161
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
6262
- name: Run unit tests
@@ -75,7 +75,7 @@ jobs:
7575
- name: Run cargo and clippy format check
7676
run: |
7777
cargo fmt --check
78-
cargo clippy --profile release --all-targets -- -D clippy::all
78+
# cargo clippy --profile release --all-targets -- -D clippy::all
7979
- name: Release Build
8080
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
8181
- name: Run unit tests

src/bloom/command_handler.rs

+21-21
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use crate::bloom::data_type::BLOOM_FILTER_TYPE;
1+
use crate::bloom::data_type::BLOOM_TYPE;
22
use crate::bloom::utils;
3-
use crate::bloom::utils::BloomFilterType;
3+
use crate::bloom::utils::BloomObject;
44
use crate::configs;
55
use crate::configs::{
66
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
@@ -18,7 +18,7 @@ fn handle_bloom_add(
1818
args: &[ValkeyString],
1919
argc: usize,
2020
item_idx: usize,
21-
bf: &mut BloomFilterType,
21+
bf: &mut BloomObject,
2222
multi: bool,
2323
add_succeeded: &mut bool,
2424
validate_size_limit: bool,
@@ -170,7 +170,7 @@ pub fn bloom_filter_add_value(
170170
curr_cmd_idx += 1;
171171
// If the filter does not exist, create one
172172
let filter_key = ctx.open_key_writable(filter_name);
173-
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
173+
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
174174
Ok(v) => v,
175175
Err(_) => {
176176
return Err(ValkeyError::WrongType);
@@ -216,7 +216,7 @@ pub fn bloom_filter_add_value(
216216
true => (None, true),
217217
false => (Some(configs::FIXED_SEED), false),
218218
};
219-
let mut bloom = match BloomFilterType::new_reserved(
219+
let mut bloom = match BloomObject::new_reserved(
220220
fp_rate,
221221
tightening_ratio,
222222
capacity,
@@ -244,7 +244,7 @@ pub fn bloom_filter_add_value(
244244
&mut add_succeeded,
245245
validate_size_limit,
246246
);
247-
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
247+
match filter_key.set_value(&BLOOM_TYPE, bloom) {
248248
Ok(()) => {
249249
replicate_and_notify_events(
250250
ctx,
@@ -262,7 +262,7 @@ pub fn bloom_filter_add_value(
262262
}
263263

264264
/// Helper function used to check whether an item (or multiple items) exists on a bloom object.
265-
fn handle_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> ValkeyValue {
265+
fn handle_item_exists(value: Option<&BloomObject>, item: &[u8]) -> ValkeyValue {
266266
if let Some(val) = value {
267267
if val.item_exists(item) {
268268
return ValkeyValue::Integer(1);
@@ -290,7 +290,7 @@ pub fn bloom_filter_exists(
290290
curr_cmd_idx += 1;
291291
// Parse the value to be checked whether it exists in the filter
292292
let filter_key = ctx.open_key(filter_name);
293-
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
293+
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
294294
Ok(v) => v,
295295
Err(_) => {
296296
return Err(ValkeyError::WrongType);
@@ -319,7 +319,7 @@ pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
319319
// Parse the filter name
320320
let filter_name = &input_args[curr_cmd_idx];
321321
let filter_key = ctx.open_key(filter_name);
322-
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
322+
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
323323
Ok(v) => v,
324324
Err(_) => {
325325
return Err(ValkeyError::WrongType);
@@ -389,7 +389,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
389389
}
390390
// If the filter does not exist, create one
391391
let filter_key = ctx.open_key_writable(filter_name);
392-
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
392+
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
393393
Ok(v) => v,
394394
Err(_) => {
395395
return Err(ValkeyError::WrongType);
@@ -408,7 +408,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
408408
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
409409
.lock()
410410
.expect("Unable to get a lock on tightening ratio static");
411-
let bloom = match BloomFilterType::new_reserved(
411+
let bloom = match BloomObject::new_reserved(
412412
fp_rate,
413413
tightening_ratio,
414414
capacity,
@@ -427,7 +427,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
427427
seed: bloom.seed(),
428428
items: &[],
429429
};
430-
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
430+
match filter_key.set_value(&BLOOM_TYPE, bloom) {
431431
Ok(()) => {
432432
replicate_and_notify_events(ctx, filter_name, false, true, replicate_args);
433433
VALKEY_OK
@@ -498,10 +498,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
498498
if !(num > BLOOM_TIGHTENING_RATIO_MIN
499499
&& num < BLOOM_TIGHTENING_RATIO_MAX) =>
500500
{
501-
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
501+
return Err(ValkeyError::Str(utils::TIGHTENING_RATIO_RANGE));
502502
}
503503
_ => {
504-
return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO));
504+
return Err(ValkeyError::Str(utils::BAD_TIGHTENING_RATIO));
505505
}
506506
};
507507
}
@@ -571,7 +571,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
571571
}
572572
// If the filter does not exist, create one
573573
let filter_key = ctx.open_key_writable(filter_name);
574-
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
574+
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
575575
Ok(v) => v,
576576
Err(_) => {
577577
return Err(ValkeyError::WrongType);
@@ -606,7 +606,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
606606
if nocreate {
607607
return Err(ValkeyError::Str(utils::NOT_FOUND));
608608
}
609-
let mut bloom = match BloomFilterType::new_reserved(
609+
let mut bloom = match BloomObject::new_reserved(
610610
fp_rate,
611611
tightening_ratio,
612612
capacity,
@@ -634,7 +634,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
634634
&mut add_succeeded,
635635
!replicated_cmd,
636636
);
637-
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
637+
match filter_key.set_value(&BLOOM_TYPE, bloom) {
638638
Ok(()) => {
639639
replicate_and_notify_events(
640640
ctx,
@@ -662,7 +662,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
662662
let filter_name = &input_args[curr_cmd_idx];
663663
curr_cmd_idx += 1;
664664
let filter_key = ctx.open_key(filter_name);
665-
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
665+
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
666666
Ok(v) => v,
667667
Err(_) => {
668668
return Err(ValkeyError::WrongType);
@@ -724,7 +724,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
724724
// find filter
725725
let filter_key = ctx.open_key_writable(filter_name);
726726

727-
let filter = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
727+
let filter = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
728728
Ok(v) => v,
729729
Err(_) => {
730730
// error
@@ -740,7 +740,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
740740
// if filter not exists, create it.
741741
let hex = value.to_vec();
742742
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
743-
let bloom = match BloomFilterType::decode_bloom_filter(&hex, validate_size_limit) {
743+
let bloom = match BloomObject::decode_object(&hex, validate_size_limit) {
744744
Ok(v) => v,
745745
Err(err) => {
746746
return Err(ValkeyError::Str(err.as_str()));
@@ -754,7 +754,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
754754
seed: bloom.seed(),
755755
items: &input_args[idx..],
756756
};
757-
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
757+
match filter_key.set_value(&BLOOM_TYPE, bloom) {
758758
Ok(_) => {
759759
replicate_and_notify_events(ctx, filter_name, false, true, replicate_args);
760760
VALKEY_OK

src/bloom/data_type.rs

+25-16
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::bloom::utils::BloomFilter;
2-
use crate::bloom::utils::BloomFilterType;
2+
use crate::bloom::utils::BloomObject;
33
use crate::configs;
44
use crate::wrapper::bloom_callback;
55
use crate::wrapper::digest::Digest;
@@ -8,15 +8,16 @@ use std::os::raw::c_int;
88
use valkey_module::native_types::ValkeyType;
99
use valkey_module::{logging, raw};
1010

11-
/// Used for decoding and encoding `BloomFilterType`. Currently used in AOF Rewrite.
12-
/// This value must increased when `BloomFilterType` struct change.
13-
pub const BLOOM_TYPE_VERSION: u8 = 1;
11+
/// Used for decoding and encoding `BloomObject`. Currently used in AOF Rewrite.
12+
/// This value must increased when `BloomObject` struct change.
13+
pub const BLOOM_OBJECT_VERSION: u8 = 1;
1414

15-
const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 1;
15+
/// Bloom Module data type RDB encoding version.
16+
const BLOOM_TYPE_ENCODING_VERSION: i32 = 1;
1617

17-
pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
18+
pub static BLOOM_TYPE: ValkeyType = ValkeyType::new(
1819
"bloomfltr",
19-
BLOOM_FILTER_TYPE_ENCODING_VERSION,
20+
BLOOM_TYPE_ENCODING_VERSION,
2021
raw::RedisModuleTypeMethods {
2122
version: raw::REDISMODULE_TYPE_METHOD_VERSION as u64,
2223
rdb_load: Some(bloom_callback::bloom_rdb_load),
@@ -48,15 +49,15 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
4849
);
4950

5051
pub trait ValkeyDataType {
51-
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType>;
52+
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomObject>;
5253
fn debug_digest(&self, dig: Digest);
5354
}
5455

55-
impl ValkeyDataType for BloomFilterType {
56+
impl ValkeyDataType for BloomObject {
5657
/// Callback to load and parse RDB data of a bloom item and create it.
57-
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType> {
58-
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
59-
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
58+
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomObject> {
59+
if encver > BLOOM_TYPE_ENCODING_VERSION {
60+
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_TYPE_ENCODING_VERSION).as_str());
6061
return None;
6162
}
6263
let Ok(num_filters) = raw::load_unsigned(rdb) else {
@@ -79,7 +80,8 @@ impl ValkeyDataType for BloomFilterType {
7980
// We start off with capacity as 1 to match the same expansion of the vector that would have occurred during bloom
8081
// object creation and scaling as a result of BF.* operations.
8182
let mut filters = Vec::with_capacity(1);
82-
83+
// Calculate the memory usage of the BloomFilter/s by summing up BloomFilter sizes as they are de-serialized.
84+
let mut filters_memory_usage = 0;
8385
for i in 0..num_filters {
8486
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
8587
return None;
@@ -97,10 +99,17 @@ impl ValkeyDataType for BloomFilterType {
9799
return None;
98100
}
99101
};
100-
if !BloomFilter::validate_size(capacity as i64, new_fp_rate) {
101-
logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit.");
102+
let curr_filter_size = BloomFilter::compute_size(capacity as i64, new_fp_rate);
103+
let curr_object_size = BloomObject::compute_size(filters.capacity())
104+
+ filters_memory_usage
105+
+ curr_filter_size;
106+
if !BloomObject::validate_size(curr_object_size) {
107+
logging::log_warning(
108+
"Failed to restore bloom object: Object larger than the allowed memory limit.",
109+
);
102110
return None;
103111
}
112+
filters_memory_usage += curr_filter_size;
104113
// Only load num_items when it's the last filter
105114
let num_items = if i == num_filters - 1 {
106115
match raw::load_unsigned(rdb) {
@@ -118,7 +127,7 @@ impl ValkeyDataType for BloomFilterType {
118127
}
119128
filters.push(Box::new(filter));
120129
}
121-
let item = BloomFilterType::from_existing(
130+
let item = BloomObject::from_existing(
122131
expansion as u32,
123132
fp_rate,
124133
tightening_ratio,

0 commit comments

Comments
 (0)