Skip to content

Commit a2e007b

Browse files
committed
Fixing ATLEASTCAPACITY calculation as well as adding MAXCAPACITY functionality for info
Signed-off-by: zackcam <[email protected]>
1 parent 2be839e commit a2e007b

File tree

6 files changed

+222
-52
lines changed

6 files changed

+222
-52
lines changed

src/bloom/command_handler.rs

+45-10
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
462462
true => (None, true),
463463
false => (Some(configs::FIXED_SEED), false),
464464
};
465-
let mut wanted_capacity = -1;
465+
let mut validate_scale_to = -1;
466466
let mut nocreate = false;
467467
let mut items_provided = false;
468468
while idx < argc {
@@ -554,12 +554,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
554554
}
555555
};
556556
}
557-
"ATLEASTCAPACITY" => {
557+
"VALIDATESCALETO" => {
558558
if idx >= (argc - 1) {
559559
return Err(ValkeyError::WrongArity);
560560
}
561561
idx += 1;
562-
wanted_capacity = match input_args[idx].to_string_lossy().parse::<i64>() {
562+
validate_scale_to = match input_args[idx].to_string_lossy().parse::<i64>() {
563563
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
564564
Ok(0) => {
565565
return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0));
@@ -584,25 +584,25 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
584584
// When the `ITEMS` argument is provided, we expect additional item arg/s to be provided.
585585
return Err(ValkeyError::WrongArity);
586586
}
587-
// Check if we have a wanted capacity and calculate if we can reach that capacity
588-
if wanted_capacity > 0 {
587+
// Check if we have a wanted capacity and calculate if we can reach that capacity. Using VALIDATESCALETO and NONSCALING options together is invalid.
588+
if validate_scale_to > 0 {
589589
if expansion == 0 {
590590
return Err(ValkeyError::Str(
591-
utils::NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID,
591+
utils::NON_SCALING_AND_VALIDATE_SCALE_TO_IS_INVALID,
592592
));
593593
}
594-
match utils::BloomObject::calculate_if_wanted_capacity_is_valid(
594+
match utils::BloomObject::calculate_max_scaled_capacity(
595595
capacity,
596596
fp_rate,
597-
wanted_capacity,
597+
validate_scale_to,
598598
tightening_ratio,
599599
expansion,
600600
) {
601-
Ok(result) => result,
601+
Ok(_) => (),
602602
Err(e) => {
603603
return Err(e);
604604
}
605-
}
605+
};
606606
}
607607
// If the filter does not exist, create one
608608
let filter_key = ctx.open_key_writable(filter_name);
@@ -714,12 +714,29 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
714714
"SIZE" => Ok(ValkeyValue::Integer(val.memory_usage() as i64)),
715715
"FILTERS" => Ok(ValkeyValue::Integer(val.num_filters() as i64)),
716716
"ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())),
717+
"ERROR" => Ok(ValkeyValue::Float(val.fp_rate())),
717718
"EXPANSION" => {
718719
if val.expansion() == 0 {
719720
return Ok(ValkeyValue::Null);
720721
}
721722
Ok(ValkeyValue::Integer(val.expansion() as i64))
722723
}
724+
// Only calculate and expose MAXSCALEDCAPACITY for scaling bloom objects.
725+
"MAXSCALEDCAPACITY" if val.expansion() > 0 => {
726+
let max_capacity = match utils::BloomObject::calculate_max_scaled_capacity(
727+
val.starting_capacity(),
728+
val.fp_rate(),
729+
-1,
730+
val.tightening_ratio(),
731+
val.expansion(),
732+
) {
733+
Ok(result) => result,
734+
Err(e) => {
735+
return Err(e);
736+
}
737+
};
738+
Ok(ValkeyValue::Integer(max_capacity))
739+
}
723740
_ => Err(ValkeyError::Str(utils::INVALID_INFO_VALUE)),
724741
}
725742
}
@@ -733,13 +750,31 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
733750
ValkeyValue::Integer(val.num_filters() as i64),
734751
ValkeyValue::SimpleStringStatic("Number of items inserted"),
735752
ValkeyValue::Integer(val.cardinality()),
753+
ValkeyValue::SimpleStringStatic("Error rate"),
754+
ValkeyValue::Float(val.fp_rate()),
736755
ValkeyValue::SimpleStringStatic("Expansion rate"),
737756
];
738757
if val.expansion() == 0 {
739758
result.push(ValkeyValue::Null);
740759
} else {
741760
result.push(ValkeyValue::Integer(val.expansion() as i64));
742761
}
762+
if val.expansion() != 0 {
763+
let max_capacity = match utils::BloomObject::calculate_max_scaled_capacity(
764+
val.starting_capacity(),
765+
val.fp_rate(),
766+
-1,
767+
val.tightening_ratio(),
768+
val.expansion(),
769+
) {
770+
Ok(result) => result,
771+
Err(e) => {
772+
return Err(e);
773+
}
774+
};
775+
result.push(ValkeyValue::SimpleStringStatic("Max scaled capacity"));
776+
result.push(ValkeyValue::Integer(max_capacity));
777+
}
743778
Ok(ValkeyValue::Array(result))
744779
}
745780
_ => Err(ValkeyError::Str(utils::NOT_FOUND)),

src/bloom/data_type.rs

+9-10
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,15 @@ impl ValkeyDataType for BloomObject {
8686
let Ok(capacity) = raw::load_unsigned(rdb) else {
8787
return None;
8888
};
89-
let new_fp_rate =
90-
match Self::calculate_fp_rate(fp_rate, num_filters as i32, tightening_ratio) {
91-
Ok(rate) => rate,
92-
Err(_) => {
93-
logging::log_warning(
94-
"Failed to restore bloom object: Reached max number of filters",
95-
);
96-
return None;
97-
}
98-
};
89+
let new_fp_rate = match Self::calculate_fp_rate(fp_rate, i as i32, tightening_ratio) {
90+
Ok(rate) => rate,
91+
Err(_) => {
92+
logging::log_warning(
93+
"Failed to restore bloom object: Reached max number of filters",
94+
);
95+
return None;
96+
}
97+
};
9998
let curr_filter_size = BloomFilter::compute_size(capacity as i64, new_fp_rate);
10099
let curr_object_size = BloomObject::compute_size(filters.capacity())
101100
+ filters_memory_usage

src/bloom/utils.rs

+131-23
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,20 @@ pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)";
3030
pub const BAD_TIGHTENING_RATIO: &str = "ERR bad tightening ratio";
3131
pub const TIGHTENING_RATIO_RANGE: &str = "ERR (0 < tightening ratio range < 1)";
3232
pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)";
33-
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
33+
pub const FALSE_POSITIVE_DEGRADES_TO_O: &str = "ERR false positive degrades to 0 on scale out";
3434
pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received";
3535
pub const EXCEEDS_MAX_BLOOM_SIZE: &str = "ERR operation exceeds bloom object memory limit";
36-
pub const WANTED_CAPACITY_EXCEEDS_MAX_SIZE: &str =
37-
"ERR Wanted capacity would go beyond bloom object memory limit";
38-
pub const WANTED_CAPACITY_FALSE_POSITIVE_INVALID: &str =
39-
"ERR False positive degrades too much to reach wanted capacity";
36+
pub const VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: &str =
37+
"ERR provided VALIDATESCALETO causes bloom object to exceed memory limit";
38+
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
39+
pub const VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: &str =
40+
"ERR provided VALIDATESCALETO causes false positive to degrade to 0";
4041
pub const KEY_EXISTS: &str = "BUSYKEY Target key name already exists.";
4142
pub const DECODE_BLOOM_OBJECT_FAILED: &str = "ERR bloom object decoding failed";
4243
pub const DECODE_UNSUPPORTED_VERSION: &str =
4344
"ERR bloom object decoding failed. Unsupported version";
44-
pub const NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID: &str =
45-
"ERR Specifying NONSCALING and ATLEASTCAPCITY is not allowed";
45+
pub const NON_SCALING_AND_VALIDATE_SCALE_TO_IS_INVALID: &str =
46+
"ERR cannot use NONSCALING and VALIDATESCALETO options together";
4647
/// Logging Error messages
4748
pub const ENCODE_BLOOM_OBJECT_FAILED: &str = "Failed to encode bloom object.";
4849

@@ -56,6 +57,8 @@ pub enum BloomError {
5657
DecodeUnsupportedVersion,
5758
ErrorRateRange,
5859
BadExpansion,
60+
FalsePositiveReachesZero,
61+
BadCapacity,
5962
}
6063

6164
impl BloomError {
@@ -69,6 +72,8 @@ impl BloomError {
6972
BloomError::DecodeUnsupportedVersion => DECODE_UNSUPPORTED_VERSION,
7073
BloomError::ErrorRateRange => ERROR_RATE_RANGE,
7174
BloomError::BadExpansion => BAD_EXPANSION,
75+
BloomError::FalsePositiveReachesZero => FALSE_POSITIVE_DEGRADES_TO_O,
76+
BloomError::BadCapacity => BAD_CAPACITY,
7277
}
7378
}
7479
}
@@ -248,6 +253,13 @@ impl BloomObject {
248253
.expect("Every BloomObject is expected to have at least one filter")
249254
.seed()
250255
}
256+
/// Return the starting capacity used by the Bloom object. This capacity is held within the first filter
257+
pub fn starting_capacity(&self) -> i64 {
258+
self.filters
259+
.first()
260+
.expect("Every BloomObject is expected to have at least one filter")
261+
.capacity()
262+
}
251263

252264
/// Return the expansion of the bloom object.
253265
pub fn expansion(&self) -> u32 {
@@ -319,7 +331,7 @@ impl BloomObject {
319331
Some(new_capacity) => new_capacity,
320332
None => {
321333
// u32:max cannot be reached with 64MB memory usage limit per filter even with a high fp rate (e.g. 0.9).
322-
return Err(BloomError::MaxNumScalingFilters);
334+
return Err(BloomError::BadCapacity);
323335
}
324336
};
325337
// Reject the request, if the operation will result in creation of a filter of size greater than what is allowed.
@@ -373,7 +385,7 @@ impl BloomObject {
373385
) -> Result<f64, BloomError> {
374386
match fp_rate * tightening_ratio.powi(num_filters) {
375387
x if x > f64::MIN_POSITIVE => Ok(x),
376-
_ => Err(BloomError::MaxNumScalingFilters),
388+
_ => Err(BloomError::FalsePositiveReachesZero),
377389
}
378390
}
379391

@@ -463,42 +475,76 @@ impl BloomObject {
463475
}
464476
}
465477

466-
pub fn calculate_if_wanted_capacity_is_valid(
478+
/// This method is called from two different bloom commands: BF.INFO and BF.INSERT. The functionality varies slightly on which command it
479+
/// is called from. When called from BF.INFO, this method is used to find the maximum possible size that the bloom object could scale to
480+
/// without throwing an error. When called from BF.INSERT, this method is used to determine if it is possible to reach the provided `validate_scale_to`.
481+
///
482+
/// # Arguments
483+
///
484+
/// * `capacity` - The size of the initial filter in the bloom object.
485+
/// * `fp_rate` - the false positive rate for the bloom object
486+
/// * `validate_scale_to` - the capacity we check to see if it can scale to. If this method is called from BF.INFO this is set as -1 as we
487+
/// want to check the maximum size we could scale up till
488+
/// * `tightening_ratio` - The tightening ratio of the object
489+
/// * `expansion` - The expanison rate of the object
490+
///
491+
/// # Returns
492+
/// * i64 - The maximum capacity that can be reached if called from BF.INFO. If called from BF.INSERT the size it reached when it became greater than `validate_scale_to`
493+
/// * ValkeyError - Can return two different errors:
494+
/// VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit
495+
/// VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capacity would cause the false positive rate to reach 0
496+
pub fn calculate_max_scaled_capacity(
467497
capacity: i64,
468498
fp_rate: f64,
469-
wanted_capacity: i64,
499+
validate_scale_to: i64,
470500
tightening_ratio: f64,
471501
expansion: u32,
472-
) -> Result<(), ValkeyError> {
473-
let mut curr_capacity = capacity;
474-
let mut curr_num_filters: u64 = 1;
475-
let mut curr_fp_rate = fp_rate;
502+
) -> Result<i64, ValkeyError> {
503+
let mut curr_filter_capacity = capacity;
504+
let mut curr_total_capacity = 0;
505+
let mut curr_num_filters: u64 = 0;
476506
let mut filters_memory_usage = 0;
477-
while curr_capacity < wanted_capacity {
478-
curr_fp_rate = match BloomObject::calculate_fp_rate(
479-
curr_fp_rate,
507+
while curr_total_capacity < validate_scale_to || validate_scale_to == -1 {
508+
// Check to see if scaling to the next filter will cause a degradation in FP to 0
509+
let curr_fp_rate = match BloomObject::calculate_fp_rate(
510+
fp_rate,
480511
curr_num_filters as i32,
481512
tightening_ratio,
482513
) {
483514
Ok(rate) => rate,
484515
Err(_) => {
485-
return Err(ValkeyError::Str(WANTED_CAPACITY_FALSE_POSITIVE_INVALID));
516+
if validate_scale_to == -1 {
517+
return Ok(curr_total_capacity);
518+
}
519+
return Err(ValkeyError::Str(VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID));
486520
}
487521
};
488-
let curr_filter_size = BloomFilter::compute_size(curr_capacity, curr_fp_rate);
522+
// Check that if it scales to this number of filters that the object won't exceed the memory limit
523+
let curr_filter_size = BloomFilter::compute_size(curr_filter_capacity, curr_fp_rate);
489524
// For vectors of size < 4 the capacity of the vector is 4. However after that the capacity is always a power of two above or equal to the size
490525
let curr_object_size = BloomObject::compute_size(
491526
std::cmp::max(4, curr_num_filters).next_power_of_two() as usize,
492527
) + filters_memory_usage
493528
+ curr_filter_size;
494529
if !BloomObject::validate_size(curr_object_size) {
495-
return Err(ValkeyError::Str(WANTED_CAPACITY_EXCEEDS_MAX_SIZE));
530+
if validate_scale_to == -1 {
531+
return Ok(curr_total_capacity);
532+
}
533+
return Err(ValkeyError::Str(VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE));
496534
}
535+
// Update overall memory usage
497536
filters_memory_usage += curr_filter_size;
498-
curr_capacity *= expansion as i64;
537+
curr_total_capacity += curr_filter_capacity;
538+
curr_filter_capacity = match curr_filter_capacity.checked_mul(expansion.into()) {
539+
Some(new_capacity) => new_capacity,
540+
None => {
541+
// u32:max cannot be reached with 64MB memory usage limit per filter even with a high fp rate (e.g. 0.9).
542+
return Err(ValkeyError::Str(BAD_CAPACITY));
543+
}
544+
};
499545
curr_num_filters += 1;
500546
}
501-
Ok(())
547+
Ok(curr_total_capacity)
502548
}
503549
}
504550

@@ -658,6 +704,7 @@ impl Drop for BloomFilter {
658704
#[cfg(test)]
659705
mod tests {
660706
use super::*;
707+
use crate::configs::TIGHTENING_RATIO_DEFAULT;
661708
use configs;
662709
use rand::{distributions::Alphanumeric, Rng};
663710
use rstest::rstest;
@@ -1006,6 +1053,10 @@ mod tests {
10061053
let test_bloom_filter2 = BloomFilter::with_random_seed(0.5_f64, 1000_i64);
10071054
let test_seed2 = test_bloom_filter2.seed();
10081055
assert_ne!(test_seed2, configs::FIXED_SEED);
1056+
// Check that the random seed changes for each BloomFilter
1057+
let test_bloom_filter3 = BloomFilter::with_random_seed(0.5_f64, 1000_i64);
1058+
let test_seed3 = test_bloom_filter3.seed();
1059+
assert_ne!(test_seed2, test_seed3);
10091060
}
10101061

10111062
#[test]
@@ -1024,6 +1075,63 @@ mod tests {
10241075
assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize));
10251076
}
10261077

1078+
#[rstest]
1079+
#[case(1000, 0.01, 10000, 2, 15000)]
1080+
#[case(10000, 0.001, 100000, 4, 210000)]
1081+
#[case(50000, 0.0001, 500000, 3, 650000)]
1082+
#[case(100000, 0.00001, 1000000, 2, 1500000)]
1083+
#[case(100, 0.00001, 1000, 1, 1000)]
1084+
fn test_calculate_max_scaled_capacity(
1085+
#[case] capacity: i64,
1086+
#[case] fp_rate: f64,
1087+
#[case] validate_scale_to: i64,
1088+
#[case] expansion: u32,
1089+
#[case] resulting_size: i64,
1090+
) {
1091+
// Validate that max scaled capacity returns the correct capacity reached when a valid can_scale to is provided
1092+
let returned_size = BloomObject::calculate_max_scaled_capacity(
1093+
capacity,
1094+
fp_rate,
1095+
validate_scale_to,
1096+
TIGHTENING_RATIO_DEFAULT
1097+
.parse()
1098+
.expect("global config should always be 0.5"),
1099+
expansion,
1100+
);
1101+
assert_eq!(resulting_size, returned_size.unwrap());
1102+
if expansion == 1 {
1103+
// Validate that when false positive rate reaches 0 we get the correct error message returned
1104+
let failed_returned_size = BloomObject::calculate_max_scaled_capacity(
1105+
capacity,
1106+
fp_rate,
1107+
validate_scale_to * 10000,
1108+
TIGHTENING_RATIO_DEFAULT
1109+
.parse()
1110+
.expect("global config should always be 0.5"),
1111+
expansion,
1112+
);
1113+
assert!(failed_returned_size
1114+
.unwrap_err()
1115+
.to_string()
1116+
.contains("provided VALIDATESCALETO causes false positive to degrade to 0"));
1117+
} else {
1118+
// Validate that when going over the max capacity in validate_scale_to then we get the correct error message returned
1119+
let failed_returned_size = BloomObject::calculate_max_scaled_capacity(
1120+
capacity,
1121+
fp_rate,
1122+
validate_scale_to * 10000,
1123+
TIGHTENING_RATIO_DEFAULT
1124+
.parse()
1125+
.expect("global config should always be 0.5"),
1126+
expansion,
1127+
);
1128+
assert!(failed_returned_size
1129+
.unwrap_err()
1130+
.to_string()
1131+
.contains("provided VALIDATESCALETO causes bloom object to exceed memory limit"));
1132+
}
1133+
}
1134+
10271135
#[rstest(expansion, case::nonscaling(0), case::scaling(2))]
10281136
fn test_bf_encode_and_decode(expansion: u32) {
10291137
let mut bf =

0 commit comments

Comments
 (0)