@@ -89,6 +89,8 @@ impl LogMergePolicy {
89
89
90
90
impl MergePolicy for LogMergePolicy {
91
91
fn compute_merge_candidates ( & self , segments : & [ SegmentMeta ] ) -> Vec < MergeCandidate > {
92
+ // Filter for segments that have less than the target number of docs, count total unmerged
93
+ // docs, and sort in descending order
92
94
let mut unmerged_docs = 0 ;
93
95
let mut levels = segments
94
96
. iter ( )
@@ -98,11 +100,13 @@ impl MergePolicy for LogMergePolicy {
98
100
. sorted_by ( |( a, _) , ( b, _) | b. cmp ( a) )
99
101
. collect_vec ( ) ;
100
102
103
+ // If there are enough unmerged documents to create a new segment of the target size,
104
+ // then create a merge candidate for them.
101
105
let mut candidates = Vec :: new ( ) ;
102
106
if unmerged_docs >= self . target_segment_size {
103
107
let mut batch_docs = 0 ;
104
108
let mut batch = Vec :: new ( ) ;
105
- // Pop segments segments from levels, smallest first due to sort at start
109
+ // Start with the smallest segments and add them to the batch until we reach the target
106
110
while let Some ( ( docs, seg) ) = levels. pop ( ) {
107
111
batch_docs += docs;
108
112
batch. push ( seg) ;
@@ -116,6 +120,8 @@ impl MergePolicy for LogMergePolicy {
116
120
// drain to reuse the buffer
117
121
batch. drain ( ..) . map ( |seg| seg. id ( ) ) . collect ( ) ,
118
122
) ) ;
123
+ // If there aren't enough documents to create another segment of the target size
124
+ // then break
119
125
if unmerged_docs <= self . target_segment_size {
120
126
break ;
121
127
}
@@ -127,19 +133,18 @@ impl MergePolicy for LogMergePolicy {
127
133
let mut batch = Vec :: new ( ) ;
128
134
levels
129
135
. iter ( )
130
- . map ( |( docs, seg ) | {
136
+ . chunk_by ( |( docs, _ ) | {
131
137
let segment_log_size = f64:: from ( self . clip_min_size ( * docs as u32 ) ) . log2 ( ) ;
132
138
if segment_log_size < ( current_max_log_size - self . level_log_size ) {
133
139
// update current_max_log_size to create a new group
134
140
current_max_log_size = segment_log_size;
135
141
}
136
- ( current_max_log_size, seg )
142
+ current_max_log_size
137
143
} )
138
- . chunk_by ( |( level, _) | * level)
139
144
. into_iter ( )
140
145
. for_each ( |( _, group) | {
141
146
let mut hit_delete_threshold = false ;
142
- group. into_iter ( ) . for_each ( |( _, seg) | {
147
+ group. for_each ( |( _, seg) | {
143
148
batch. push ( seg. id ( ) ) ;
144
149
if !hit_delete_threshold && self . segment_above_deletes_threshold ( seg) {
145
150
hit_delete_threshold = true ;
@@ -354,12 +359,14 @@ mod tests {
354
359
355
360
#[ test]
356
361
fn test_skip_merge_large_segments ( ) {
362
+ // All of these should be merged into a single segment since 2 * 49_999 < 100_000
357
363
let test_input_merge_all = vec ! [
358
364
create_random_segment_meta( 49_999 ) ,
359
365
create_random_segment_meta( 49_999 ) ,
360
366
create_random_segment_meta( 49_999 ) ,
361
367
] ;
362
368
369
+ // Only two of these should be merged since 2 * 50_000 >= 100_000, then the third is left
363
370
let test_input_merge_two = vec ! [
364
371
create_random_segment_meta( 50_000 ) ,
365
372
create_random_segment_meta( 50_000 ) ,
@@ -371,8 +378,8 @@ mod tests {
371
378
let result_list_merge_two =
372
379
test_merge_policy ( ) . compute_merge_candidates ( & test_input_merge_two) ;
373
380
374
- assert_eq ! ( result_list_merge_two[ 0 ] . 0 . len( ) , 2 ) ;
375
381
assert_eq ! ( result_list_merge_all[ 0 ] . 0 . len( ) , 3 ) ;
382
+ assert_eq ! ( result_list_merge_two[ 0 ] . 0 . len( ) , 2 ) ;
376
383
}
377
384
378
385
#[ test]
0 commit comments