@@ -308,14 +308,18 @@ struct RollupPerCollection {
308
308
}
309
309
310
310
impl RollupPerCollection {
311
- fn new ( first_observation : LogPosition , num_records : u64 ) -> Self {
311
+ fn new (
312
+ first_observation : LogPosition ,
313
+ num_records : u64 ,
314
+ initial_insertion_epoch_us : u64 ,
315
+ ) -> Self {
312
316
Self {
313
317
start_log_position : first_observation,
314
318
limit_log_position : LogPosition :: from_offset (
315
319
first_observation. offset ( ) . saturating_add ( num_records) ,
316
320
) ,
317
321
reinsert_count : 0 ,
318
- initial_insertion_epoch_us : 0 ,
322
+ initial_insertion_epoch_us,
319
323
}
320
324
}
321
325
@@ -329,14 +333,15 @@ impl RollupPerCollection {
329
333
if log_position < self . start_log_position {
330
334
self . start_log_position = log_position;
331
335
}
332
- if log_position + num_records > self . limit_log_position {
333
- self . limit_log_position = log_position + num_records;
336
+ if log_position. offset ( ) . saturating_add ( num_records) > self . limit_log_position . offset ( ) {
337
+ self . limit_log_position =
338
+ LogPosition :: from_offset ( log_position. offset ( ) . saturating_add ( num_records) ) ;
334
339
}
335
340
// Take the biggest reinsert count.
336
341
self . reinsert_count = std:: cmp:: max ( self . reinsert_count , reinsert_count) ;
337
342
// Consider the most recent initial insertion time so if we've compacted earlier we drop.
338
343
self . initial_insertion_epoch_us =
339
- std:: cmp:: max ( self . initial_insertion_epoch_us , initial_insertion_epoch_us) ;
344
+ std:: cmp:: min ( self . initial_insertion_epoch_us , initial_insertion_epoch_us) ;
340
345
}
341
346
342
347
fn witness_cursor ( & mut self , witness : Option < & Witness > ) {
@@ -349,6 +354,8 @@ impl RollupPerCollection {
349
354
self . start_log_position = witness
350
355
. map ( |x| x. 1 . position )
351
356
. unwrap_or ( LogPosition :: from_offset ( 1 ) ) ;
357
+ assert ! ( self . start_log_position <= self . limit_log_position) ;
358
+ self . limit_log_position = self . limit_log_position . max ( self . start_log_position ) ;
352
359
}
353
360
354
361
fn is_empty ( & self ) -> bool {
@@ -359,14 +366,20 @@ impl RollupPerCollection {
359
366
DirtyMarker :: MarkDirty {
360
367
collection_id,
361
368
log_position : self . start_log_position ,
362
- num_records : self . limit_log_position - self . start_log_position ,
363
- reinsert_count : self . reinsert_count ,
369
+ num_records : self
370
+ . limit_log_position
371
+ . offset ( )
372
+ . saturating_sub ( self . start_log_position . offset ( ) ) ,
373
+ reinsert_count : self . reinsert_count . saturating_add ( 1 ) ,
364
374
initial_insertion_epoch_us : self . initial_insertion_epoch_us ,
365
375
}
366
376
}
367
377
368
378
fn requires_backpressure ( & self , threshold : u64 ) -> bool {
369
- self . limit_log_position - self . start_log_position >= threshold
379
+ self . limit_log_position
380
+ . offset ( )
381
+ . saturating_sub ( self . start_log_position . offset ( ) )
382
+ >= threshold
370
383
}
371
384
}
372
385
@@ -430,9 +443,13 @@ impl DirtyMarker {
430
443
reinsert_count,
431
444
initial_insertion_epoch_us,
432
445
} => {
433
- let position = rollups
434
- . entry ( * collection_id)
435
- . or_insert_with ( || RollupPerCollection :: new ( * log_position, * num_records) ) ;
446
+ let position = rollups. entry ( * collection_id) . or_insert_with ( || {
447
+ RollupPerCollection :: new (
448
+ * log_position,
449
+ * num_records,
450
+ * initial_insertion_epoch_us,
451
+ )
452
+ } ) ;
436
453
position. observe_dirty_marker (
437
454
* log_position,
438
455
* num_records,
@@ -2461,22 +2478,21 @@ mod tests {
2461
2478
fn rollup_per_collection_new ( ) {
2462
2479
let start_position = LogPosition :: from_offset ( 10 ) ;
2463
2480
let num_records = 5 ;
2464
- let rollup = RollupPerCollection :: new ( start_position, num_records) ;
2481
+ let rollup = RollupPerCollection :: new ( start_position, num_records, 0 ) ;
2465
2482
2466
2483
assert_eq ! ( start_position, rollup. start_log_position) ;
2467
2484
assert_eq ! ( LogPosition :: from_offset( 15 ) , rollup. limit_log_position) ;
2468
2485
assert_eq ! ( 0 , rollup. reinsert_count) ;
2469
- assert_eq ! ( 0 , rollup. initial_insertion_epoch_us) ;
2470
2486
}
2471
2487
2472
2488
#[ test]
2473
2489
fn rollup_per_collection_observe_dirty_marker ( ) {
2474
2490
let start_position = LogPosition :: from_offset ( 10 ) ;
2475
- let mut rollup = RollupPerCollection :: new ( start_position, 5 ) ;
2476
2491
let now = SystemTime :: now ( )
2477
2492
. duration_since ( SystemTime :: UNIX_EPOCH )
2478
2493
. unwrap ( )
2479
2494
. as_micros ( ) as u64 ;
2495
+ let mut rollup = RollupPerCollection :: new ( start_position, 5 , now) ;
2480
2496
2481
2497
// Observe a marker that extends the range
2482
2498
rollup. observe_dirty_marker ( LogPosition :: from_offset ( 20 ) , 10 , 3 , now) ;
@@ -2489,22 +2505,22 @@ mod tests {
2489
2505
rollup. observe_dirty_marker ( LogPosition :: from_offset ( 5 ) , 2 , 1 , now - 1000 ) ;
2490
2506
assert_eq ! ( LogPosition :: from_offset( 5 ) , rollup. start_log_position) ;
2491
2507
assert_eq ! ( LogPosition :: from_offset( 30 ) , rollup. limit_log_position) ;
2492
- assert_eq ! ( 3 , rollup. reinsert_count) ; // Should keep max
2493
- assert_eq ! ( now, rollup. initial_insertion_epoch_us) ; // Should keep max
2508
+ assert_eq ! ( 3 , rollup. reinsert_count) ; // Same
2509
+ assert_eq ! ( now - 1000 , rollup. initial_insertion_epoch_us) ; // Should move to min
2494
2510
}
2495
2511
2496
2512
#[ test]
2497
2513
fn rollup_per_collection_is_empty ( ) {
2498
- let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 0 ) ;
2514
+ let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 0 , 42 ) ;
2499
2515
assert ! ( rollup. is_empty( ) ) ;
2500
2516
2501
- let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 5 ) ;
2517
+ let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 5 , 42 ) ;
2502
2518
assert ! ( !rollup. is_empty( ) ) ;
2503
2519
}
2504
2520
2505
2521
#[ test]
2506
2522
fn rollup_per_collection_requires_backpressure ( ) {
2507
- let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 100 ) ;
2523
+ let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 100 , 42 ) ;
2508
2524
assert ! ( rollup. requires_backpressure( 50 ) ) ;
2509
2525
assert ! ( !rollup. requires_backpressure( 150 ) ) ;
2510
2526
assert ! ( rollup. requires_backpressure( 100 ) ) ; // Equal case
@@ -2518,7 +2534,7 @@ mod tests {
2518
2534
. unwrap ( )
2519
2535
. as_micros ( ) as u64 ;
2520
2536
2521
- let mut rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 5 ) ;
2537
+ let mut rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 5 , now ) ;
2522
2538
rollup. observe_dirty_marker ( LogPosition :: from_offset ( 10 ) , 5 , 2 , now) ;
2523
2539
2524
2540
let marker = rollup. dirty_marker ( collection_id) ;
@@ -2533,7 +2549,7 @@ mod tests {
2533
2549
assert_eq ! ( collection_id, cid) ;
2534
2550
assert_eq ! ( LogPosition :: from_offset( 10 ) , log_position) ;
2535
2551
assert_eq ! ( 5 , num_records) ;
2536
- assert_eq ! ( 2 , reinsert_count) ;
2552
+ assert_eq ! ( 3 , reinsert_count) ;
2537
2553
assert_eq ! ( now, initial_insertion_epoch_us) ;
2538
2554
}
2539
2555
_ => panic ! ( "Expected MarkDirty variant" ) ,
@@ -2674,7 +2690,7 @@ mod tests {
2674
2690
assert_eq ! ( LogPosition :: from_offset( 10 ) , rollup1. start_log_position) ;
2675
2691
assert_eq ! ( LogPosition :: from_offset( 33 ) , rollup1. limit_log_position) ;
2676
2692
assert_eq ! ( 1 , rollup1. reinsert_count) ; // max of 1 and 0
2677
- assert_eq ! ( now, rollup1. initial_insertion_epoch_us) ; // max of now and now-1000
2693
+ assert_eq ! ( now - 1000 , rollup1. initial_insertion_epoch_us) ; // max of now and now-1000
2678
2694
2679
2695
// Check collection_id2 rollup
2680
2696
let rollup2 = rollup. get ( & collection_id2) . unwrap ( ) ;
@@ -2827,7 +2843,7 @@ mod tests {
2827
2843
2828
2844
#[ test]
2829
2845
fn rollup_per_collection_witness_functionality ( ) {
2830
- let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 5 ) ;
2846
+ let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 5 , 42 ) ;
2831
2847
2832
2848
// Test that the rollup can handle boundary conditions
2833
2849
assert_eq ! ( LogPosition :: from_offset( 10 ) , rollup. start_log_position) ;
@@ -2837,11 +2853,11 @@ mod tests {
2837
2853
2838
2854
#[ test]
2839
2855
fn rollup_per_collection_backpressure_boundary_conditions ( ) {
2840
- let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 0 ) , u64:: MAX ) ;
2856
+ let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 0 ) , u64:: MAX , 42 ) ;
2841
2857
assert ! ( rollup. requires_backpressure( u64 :: MAX - 1 ) ) ;
2842
2858
assert ! ( rollup. requires_backpressure( u64 :: MAX ) ) ;
2843
2859
2844
- let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( u64:: MAX - 100 ) , 50 ) ;
2860
+ let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( u64:: MAX - 100 ) , 50 , 42 ) ;
2845
2861
assert ! ( !rollup. requires_backpressure( 100 ) ) ;
2846
2862
assert ! ( rollup. requires_backpressure( 25 ) ) ;
2847
2863
}
@@ -2997,11 +3013,11 @@ mod tests {
2997
3013
2998
3014
#[ test]
2999
3015
fn rollup_per_collection_gap_handling ( ) {
3000
- let mut rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 5 ) ;
3001
3016
let now = SystemTime :: now ( )
3002
3017
. duration_since ( SystemTime :: UNIX_EPOCH )
3003
3018
. unwrap ( )
3004
3019
. as_micros ( ) as u64 ;
3020
+ let mut rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 5 , now + 1 ) ;
3005
3021
3006
3022
rollup. observe_dirty_marker ( LogPosition :: from_offset ( 20 ) , 5 , 1 , now) ;
3007
3023
@@ -3059,7 +3075,7 @@ mod tests {
3059
3075
collection_rollup. limit_log_position
3060
3076
) ;
3061
3077
assert_eq ! ( 99 , collection_rollup. reinsert_count) ;
3062
- assert_eq ! ( now + 999 , collection_rollup. initial_insertion_epoch_us) ;
3078
+ assert_eq ! ( now, collection_rollup. initial_insertion_epoch_us) ;
3063
3079
}
3064
3080
3065
3081
#[ test]
@@ -3118,7 +3134,7 @@ mod tests {
3118
3134
#[ test]
3119
3135
fn rollup_per_collection_extreme_positions ( ) {
3120
3136
let start_position = LogPosition :: from_offset ( u64:: MAX - 10 ) ;
3121
- let rollup = RollupPerCollection :: new ( start_position, 5 ) ;
3137
+ let rollup = RollupPerCollection :: new ( start_position, 5 , 42 ) ;
3122
3138
3123
3139
assert_eq ! ( start_position, rollup. start_log_position) ;
3124
3140
assert ! ( !rollup. is_empty( ) ) ;
@@ -3127,7 +3143,7 @@ mod tests {
3127
3143
3128
3144
#[ test]
3129
3145
fn rollup_per_collection_zero_epoch ( ) {
3130
- let mut rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 5 ) ;
3146
+ let mut rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 5 , u64 :: MAX ) ;
3131
3147
3132
3148
rollup. observe_dirty_marker ( LogPosition :: from_offset ( 15 ) , 5 , 1 , 0 ) ;
3133
3149
@@ -3193,23 +3209,24 @@ mod tests {
3193
3209
3194
3210
#[ test]
3195
3211
fn rollup_per_collection_edge_case_positions ( ) {
3196
- let mut rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 100 ) , 0 ) ;
3212
+ let mut rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 100 ) , 0 , 1042 ) ;
3197
3213
3198
3214
rollup. observe_dirty_marker ( LogPosition :: from_offset ( 50 ) , 25 , 1 , 1000 ) ;
3199
3215
3200
3216
assert_eq ! ( LogPosition :: from_offset( 50 ) , rollup. start_log_position) ;
3201
3217
assert_eq ! ( LogPosition :: from_offset( 100 ) , rollup. limit_log_position) ;
3218
+ assert_eq ! ( 1000 , rollup. initial_insertion_epoch_us) ;
3202
3219
}
3203
3220
3204
3221
#[ test]
3205
3222
fn backpressure_threshold_verification ( ) {
3206
- let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 0 ) , 100 ) ;
3223
+ let rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 0 ) , 100 , 42 ) ;
3207
3224
3208
3225
assert ! ( rollup. requires_backpressure( 99 ) ) ;
3209
3226
assert ! ( rollup. requires_backpressure( 100 ) ) ;
3210
3227
assert ! ( !rollup. requires_backpressure( 101 ) ) ;
3211
3228
3212
- let zero_rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 0 ) ;
3229
+ let zero_rollup = RollupPerCollection :: new ( LogPosition :: from_offset ( 10 ) , 0 , 42 ) ;
3213
3230
assert ! ( !zero_rollup. requires_backpressure( 1 ) ) ;
3214
3231
assert ! ( zero_rollup. requires_backpressure( 0 ) ) ;
3215
3232
}
0 commit comments