Skip to content

[HOTFIX] applying PR #4855 to release/2025-06-13 #4860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rust/log-service/proptest-regressions/lib.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 224805d1b23398d59a037d3ae192e5d32fc114957b64ea0b8209d67ef642035b # shrinks to operations_before_seal = [], operations_after_migrate = [OperationRecord { id: "", document: None, embedding: Some("[...]"), metadata: None, operation: Add }]
cc 6c09d8d5a095fde4c6263f24e208c2db1ac203d8f8f7d899573659268255c239 # shrinks to initial_operations = [], source_operations = [], fork_operations = []
cc 85a395fbfca69e7c9197fce89252b273c98389e3f5a773c443f405639f137e49 # shrinks to operations = [(0, OperationRecord { id: "", document: None, embedding: Some("[...]"), metadata: None, operation: Add })]
cc 9fbbfa4c87cfe3d41253eadb2bc048aa36bc939208cbe23e3ee5eb8094df91d4 # shrinks to read_offset = 1, batch_size = 1, operations = [OperationRecord { id: "", document: None, embedding: Some("[...]"), metadata: None, operation: Add }]
cc ba6cef35853bca82d7527a7f42b2dbb8a6d60f3084f249993b56cf945fd83b0e # shrinks to read_offset = 1, batch_size = 1, operations = [OperationRecord { id: "", document: None, embedding: Some("[...]"), metadata: None, operation: Add }]
cc bec924432fdfb2bd91273126b0cd30300a6988be3bbb3cf4ceb928eb8a37bacd # shrinks to initial_operations = [], source_operations = [], fork_operations = []
81 changes: 49 additions & 32 deletions rust/log-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,18 @@ struct RollupPerCollection {
}

impl RollupPerCollection {
fn new(first_observation: LogPosition, num_records: u64) -> Self {
fn new(
first_observation: LogPosition,
num_records: u64,
initial_insertion_epoch_us: u64,
) -> Self {
Self {
start_log_position: first_observation,
limit_log_position: LogPosition::from_offset(
first_observation.offset().saturating_add(num_records),
),
reinsert_count: 0,
initial_insertion_epoch_us: 0,
initial_insertion_epoch_us,
}
}

Expand All @@ -329,14 +333,15 @@ impl RollupPerCollection {
if log_position < self.start_log_position {
self.start_log_position = log_position;
}
if log_position + num_records > self.limit_log_position {
self.limit_log_position = log_position + num_records;
if log_position.offset().saturating_add(num_records) > self.limit_log_position.offset() {
self.limit_log_position =
LogPosition::from_offset(log_position.offset().saturating_add(num_records));
}
// Take the biggest reinsert count.
self.reinsert_count = std::cmp::max(self.reinsert_count, reinsert_count);
// Consider the most recent initial insertion time so if we've compacted earlier we drop.
self.initial_insertion_epoch_us =
std::cmp::max(self.initial_insertion_epoch_us, initial_insertion_epoch_us);
std::cmp::min(self.initial_insertion_epoch_us, initial_insertion_epoch_us);
}

fn witness_cursor(&mut self, witness: Option<&Witness>) {
Expand All @@ -349,6 +354,8 @@ impl RollupPerCollection {
self.start_log_position = witness
.map(|x| x.1.position)
.unwrap_or(LogPosition::from_offset(1));
assert!(self.start_log_position <= self.limit_log_position);
self.limit_log_position = self.limit_log_position.max(self.start_log_position);
}

fn is_empty(&self) -> bool {
Expand All @@ -359,14 +366,20 @@ impl RollupPerCollection {
DirtyMarker::MarkDirty {
collection_id,
log_position: self.start_log_position,
num_records: self.limit_log_position - self.start_log_position,
reinsert_count: self.reinsert_count,
num_records: self
.limit_log_position
.offset()
.saturating_sub(self.start_log_position.offset()),
reinsert_count: self.reinsert_count.saturating_add(1),
initial_insertion_epoch_us: self.initial_insertion_epoch_us,
}
}

fn requires_backpressure(&self, threshold: u64) -> bool {
self.limit_log_position - self.start_log_position >= threshold
self.limit_log_position
.offset()
.saturating_sub(self.start_log_position.offset())
>= threshold
}
}

Expand Down Expand Up @@ -430,9 +443,13 @@ impl DirtyMarker {
reinsert_count,
initial_insertion_epoch_us,
} => {
let position = rollups
.entry(*collection_id)
.or_insert_with(|| RollupPerCollection::new(*log_position, *num_records));
let position = rollups.entry(*collection_id).or_insert_with(|| {
RollupPerCollection::new(
*log_position,
*num_records,
*initial_insertion_epoch_us,
)
});
position.observe_dirty_marker(
*log_position,
*num_records,
Expand Down Expand Up @@ -2461,22 +2478,21 @@ mod tests {
fn rollup_per_collection_new() {
let start_position = LogPosition::from_offset(10);
let num_records = 5;
let rollup = RollupPerCollection::new(start_position, num_records);
let rollup = RollupPerCollection::new(start_position, num_records, 0);

assert_eq!(start_position, rollup.start_log_position);
assert_eq!(LogPosition::from_offset(15), rollup.limit_log_position);
assert_eq!(0, rollup.reinsert_count);
assert_eq!(0, rollup.initial_insertion_epoch_us);
}

#[test]
fn rollup_per_collection_observe_dirty_marker() {
let start_position = LogPosition::from_offset(10);
let mut rollup = RollupPerCollection::new(start_position, 5);
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_micros() as u64;
let mut rollup = RollupPerCollection::new(start_position, 5, now);

// Observe a marker that extends the range
rollup.observe_dirty_marker(LogPosition::from_offset(20), 10, 3, now);
Expand All @@ -2489,22 +2505,22 @@ mod tests {
rollup.observe_dirty_marker(LogPosition::from_offset(5), 2, 1, now - 1000);
assert_eq!(LogPosition::from_offset(5), rollup.start_log_position);
assert_eq!(LogPosition::from_offset(30), rollup.limit_log_position);
assert_eq!(3, rollup.reinsert_count); // Should keep max
assert_eq!(now, rollup.initial_insertion_epoch_us); // Should keep max
assert_eq!(3, rollup.reinsert_count); // Same
assert_eq!(now - 1000, rollup.initial_insertion_epoch_us); // Should move to min
}

#[test]
fn rollup_per_collection_is_empty() {
let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 0);
let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 0, 42);
assert!(rollup.is_empty());

let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5);
let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, 42);
assert!(!rollup.is_empty());
}

#[test]
fn rollup_per_collection_requires_backpressure() {
let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 100);
let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 100, 42);
assert!(rollup.requires_backpressure(50));
assert!(!rollup.requires_backpressure(150));
assert!(rollup.requires_backpressure(100)); // Equal case
Expand All @@ -2518,7 +2534,7 @@ mod tests {
.unwrap()
.as_micros() as u64;

let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5);
let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, now);
rollup.observe_dirty_marker(LogPosition::from_offset(10), 5, 2, now);

let marker = rollup.dirty_marker(collection_id);
Expand All @@ -2533,7 +2549,7 @@ mod tests {
assert_eq!(collection_id, cid);
assert_eq!(LogPosition::from_offset(10), log_position);
assert_eq!(5, num_records);
assert_eq!(2, reinsert_count);
assert_eq!(3, reinsert_count);
assert_eq!(now, initial_insertion_epoch_us);
}
_ => panic!("Expected MarkDirty variant"),
Expand Down Expand Up @@ -2674,7 +2690,7 @@ mod tests {
assert_eq!(LogPosition::from_offset(10), rollup1.start_log_position);
assert_eq!(LogPosition::from_offset(33), rollup1.limit_log_position);
assert_eq!(1, rollup1.reinsert_count); // max of 1 and 0
assert_eq!(now, rollup1.initial_insertion_epoch_us); // max of now and now-1000
assert_eq!(now - 1000, rollup1.initial_insertion_epoch_us); // max of now and now-1000

// Check collection_id2 rollup
let rollup2 = rollup.get(&collection_id2).unwrap();
Expand Down Expand Up @@ -2827,7 +2843,7 @@ mod tests {

#[test]
fn rollup_per_collection_witness_functionality() {
let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5);
let rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, 42);

// Test that the rollup can handle boundary conditions
assert_eq!(LogPosition::from_offset(10), rollup.start_log_position);
Expand All @@ -2837,11 +2853,11 @@ mod tests {

#[test]
fn rollup_per_collection_backpressure_boundary_conditions() {
let rollup = RollupPerCollection::new(LogPosition::from_offset(0), u64::MAX);
let rollup = RollupPerCollection::new(LogPosition::from_offset(0), u64::MAX, 42);
assert!(rollup.requires_backpressure(u64::MAX - 1));
assert!(rollup.requires_backpressure(u64::MAX));

let rollup = RollupPerCollection::new(LogPosition::from_offset(u64::MAX - 100), 50);
let rollup = RollupPerCollection::new(LogPosition::from_offset(u64::MAX - 100), 50, 42);
assert!(!rollup.requires_backpressure(100));
assert!(rollup.requires_backpressure(25));
}
Expand Down Expand Up @@ -2997,11 +3013,11 @@ mod tests {

#[test]
fn rollup_per_collection_gap_handling() {
let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5);
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_micros() as u64;
let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, now + 1);

rollup.observe_dirty_marker(LogPosition::from_offset(20), 5, 1, now);

Expand Down Expand Up @@ -3059,7 +3075,7 @@ mod tests {
collection_rollup.limit_log_position
);
assert_eq!(99, collection_rollup.reinsert_count);
assert_eq!(now + 999, collection_rollup.initial_insertion_epoch_us);
assert_eq!(now, collection_rollup.initial_insertion_epoch_us);
}

#[test]
Expand Down Expand Up @@ -3118,7 +3134,7 @@ mod tests {
#[test]
fn rollup_per_collection_extreme_positions() {
let start_position = LogPosition::from_offset(u64::MAX - 10);
let rollup = RollupPerCollection::new(start_position, 5);
let rollup = RollupPerCollection::new(start_position, 5, 42);

assert_eq!(start_position, rollup.start_log_position);
assert!(!rollup.is_empty());
Expand All @@ -3127,7 +3143,7 @@ mod tests {

#[test]
fn rollup_per_collection_zero_epoch() {
let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5);
let mut rollup = RollupPerCollection::new(LogPosition::from_offset(10), 5, u64::MAX);

rollup.observe_dirty_marker(LogPosition::from_offset(15), 5, 1, 0);

Expand Down Expand Up @@ -3193,23 +3209,24 @@ mod tests {

#[test]
fn rollup_per_collection_edge_case_positions() {
let mut rollup = RollupPerCollection::new(LogPosition::from_offset(100), 0);
let mut rollup = RollupPerCollection::new(LogPosition::from_offset(100), 0, 1042);

rollup.observe_dirty_marker(LogPosition::from_offset(50), 25, 1, 1000);

assert_eq!(LogPosition::from_offset(50), rollup.start_log_position);
assert_eq!(LogPosition::from_offset(100), rollup.limit_log_position);
assert_eq!(1000, rollup.initial_insertion_epoch_us);
}

#[test]
fn backpressure_threshold_verification() {
let rollup = RollupPerCollection::new(LogPosition::from_offset(0), 100);
let rollup = RollupPerCollection::new(LogPosition::from_offset(0), 100, 42);

assert!(rollup.requires_backpressure(99));
assert!(rollup.requires_backpressure(100));
assert!(!rollup.requires_backpressure(101));

let zero_rollup = RollupPerCollection::new(LogPosition::from_offset(10), 0);
let zero_rollup = RollupPerCollection::new(LogPosition::from_offset(10), 0, 42);
assert!(!zero_rollup.requires_backpressure(1));
assert!(zero_rollup.requires_backpressure(0));
}
Expand Down
Loading