8
8
9
9
package org .opensearch .index .translog ;
10
10
11
+ import org .apache .logging .log4j .LogManager ;
11
12
import org .apache .logging .log4j .Logger ;
13
+ import org .opensearch .action .LatchedActionListener ;
12
14
import org .opensearch .cluster .service .ClusterService ;
13
15
import org .opensearch .common .blobstore .BlobMetadata ;
14
16
import org .opensearch .common .collect .Tuple ;
33
35
import java .util .Optional ;
34
36
import java .util .Set ;
35
37
import java .util .TreeSet ;
38
+ import java .util .concurrent .CountDownLatch ;
39
+ import java .util .concurrent .TimeUnit ;
40
+ import java .util .concurrent .atomic .AtomicLong ;
36
41
import java .util .function .BooleanSupplier ;
37
42
import java .util .function .LongConsumer ;
38
43
import java .util .function .LongSupplier ;
52
57
*/
53
58
public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
54
59
60
+ private static Logger staticLogger = LogManager .getLogger (RemoteFsTimestampAwareTranslog .class );
55
61
private final Logger logger ;
56
62
private final Map <Long , String > metadataFilePinnedTimestampMap ;
57
63
// For metadata files, with no min generation in the name, we cache generation data to avoid multiple reads.
58
64
private final Map <String , Tuple <Long , Long >> oldFormatMetadataFileGenerationMap ;
59
65
private final Map <String , Tuple <Long , Long >> oldFormatMetadataFilePrimaryTermMap ;
66
+ private final AtomicLong minPrimaryTermInRemote = new AtomicLong (Long .MAX_VALUE );
60
67
61
68
public RemoteFsTimestampAwareTranslog (
62
69
TranslogConfig config ,
@@ -167,7 +174,11 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
167
174
return ;
168
175
}
169
176
170
- List <String > metadataFilesToBeDeleted = getMetadataFilesToBeDeleted (metadataFiles );
177
+ List <String > metadataFilesToBeDeleted = getMetadataFilesToBeDeleted (
178
+ metadataFiles ,
179
+ metadataFilePinnedTimestampMap ,
180
+ logger
181
+ );
171
182
172
183
// If index is not deleted, make sure to keep latest metadata file
173
184
if (indexDeleted == false ) {
@@ -186,41 +197,35 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
186
197
metadataFilesNotToBeDeleted .removeAll (metadataFilesToBeDeleted );
187
198
188
199
logger .debug (() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted );
189
- Map < Long , Set <Long >> generationsToBeDeletedToPrimaryTermRangeMap = getGenerationsToBeDeleted (
200
+ Set <Long > generationsToBeDeleted = getGenerationsToBeDeleted (
190
201
metadataFilesNotToBeDeleted ,
191
202
metadataFilesToBeDeleted ,
192
203
indexDeleted
193
204
);
194
205
195
- logger .debug (() -> "generationsToBeDeletedToPrimaryTermRangeMap = " + generationsToBeDeletedToPrimaryTermRangeMap );
196
- if (generationsToBeDeletedToPrimaryTermRangeMap .isEmpty () == false ) {
206
+ logger .debug (() -> "generationsToBeDeleted = " + generationsToBeDeleted );
207
+ if (generationsToBeDeleted .isEmpty () == false ) {
197
208
// Delete stale generations
198
- Map <Long , Set <Long >> primaryTermToGenerationsMap = getPrimaryTermToGenerationsMap (
199
- generationsToBeDeletedToPrimaryTermRangeMap
200
- );
201
209
translogTransferManager .deleteGenerationAsync (
202
- primaryTermToGenerationsMap ,
210
+ primaryTermSupplier .getAsLong (),
211
+ generationsToBeDeleted ,
203
212
remoteGenerationDeletionPermits ::release
204
213
);
205
- } else {
206
- remoteGenerationDeletionPermits .release ();
207
- }
208
214
209
- if (metadataFilesToBeDeleted .isEmpty () == false ) {
210
215
// Delete stale metadata files
211
216
translogTransferManager .deleteMetadataFilesAsync (
212
217
metadataFilesToBeDeleted ,
213
218
remoteGenerationDeletionPermits ::release
214
219
);
215
- } else {
216
- remoteGenerationDeletionPermits .release ();
217
- }
218
220
219
- // Update cache to keep only those metadata files that are not getting deleted
220
- oldFormatMetadataFileGenerationMap .keySet ().retainAll (metadataFilesNotToBeDeleted );
221
+ // Update cache to keep only those metadata files that are not getting deleted
222
+ oldFormatMetadataFileGenerationMap .keySet ().retainAll (metadataFilesNotToBeDeleted );
221
223
222
- // Delete stale primary terms
223
- deleteStaleRemotePrimaryTerms (metadataFiles );
224
+ // Delete stale primary terms
225
+ deleteStaleRemotePrimaryTerms (metadataFilesNotToBeDeleted );
226
+ } else {
227
+ remoteGenerationDeletionPermits .release (REMOTE_DELETION_PERMITS );
228
+ }
224
229
} catch (Exception e ) {
225
230
remoteGenerationDeletionPermits .release (REMOTE_DELETION_PERMITS );
226
231
}
@@ -235,21 +240,8 @@ public void onFailure(Exception e) {
235
240
translogTransferManager .listTranslogMetadataFilesAsync (listMetadataFilesListener );
236
241
}
237
242
238
- protected Map <Long , Set <Long >> getPrimaryTermToGenerationsMap (Map <Long , Set <Long >> generationsToBeDeletedToPrimaryTermRangeMap ) {
239
- Map <Long , Set <Long >> primaryTermToGenerationsMap = new HashMap <>();
240
- for (Map .Entry <Long , Set <Long >> entry : generationsToBeDeletedToPrimaryTermRangeMap .entrySet ()) {
241
- for (Long primaryTerm : entry .getValue ()) {
242
- if (primaryTermToGenerationsMap .containsKey (primaryTerm ) == false ) {
243
- primaryTermToGenerationsMap .put (primaryTerm , new HashSet <>());
244
- }
245
- primaryTermToGenerationsMap .get (primaryTerm ).add (entry .getKey ());
246
- }
247
- }
248
- return primaryTermToGenerationsMap ;
249
- }
250
-
251
243
// Visible for testing
252
- protected Map < Long , Set <Long > > getGenerationsToBeDeleted (
244
+ protected Set <Long > getGenerationsToBeDeleted (
253
245
List <String > metadataFilesNotToBeDeleted ,
254
246
List <String > metadataFilesToBeDeleted ,
255
247
boolean indexDeleted
@@ -260,60 +252,36 @@ protected Map<Long, Set<Long>> getGenerationsToBeDeleted(
260
252
maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings ().getRemoteTranslogExtraKeep ();
261
253
}
262
254
263
- Map < Long , Set <String >> generationsFromMetadataFilesToBeDeleted = new HashMap <>();
255
+ Set <Long > generationsFromMetadataFilesToBeDeleted = new HashSet <>();
264
256
for (String mdFile : metadataFilesToBeDeleted ) {
265
257
Tuple <Long , Long > minMaxGen = getMinMaxTranslogGenerationFromMetadataFile (mdFile , translogTransferManager );
266
- List <Long > generations = LongStream .rangeClosed (minMaxGen .v1 (), minMaxGen .v2 ()).boxed ().collect (Collectors .toList ());
267
- for (Long generation : generations ) {
268
- if (generationsFromMetadataFilesToBeDeleted .containsKey (generation ) == false ) {
269
- generationsFromMetadataFilesToBeDeleted .put (generation , new HashSet <>());
270
- }
271
- generationsFromMetadataFilesToBeDeleted .get (generation ).add (mdFile );
272
- }
273
- }
274
-
275
- for (String mdFile : metadataFilesNotToBeDeleted ) {
276
- Tuple <Long , Long > minMaxGen = getMinMaxTranslogGenerationFromMetadataFile (mdFile , translogTransferManager );
277
- List <Long > generations = LongStream .rangeClosed (minMaxGen .v1 (), minMaxGen .v2 ()).boxed ().collect (Collectors .toList ());
278
- for (Long generation : generations ) {
279
- if (generationsFromMetadataFilesToBeDeleted .containsKey (generation )) {
280
- generationsFromMetadataFilesToBeDeleted .get (generation ).add (mdFile );
281
- }
282
- }
258
+ generationsFromMetadataFilesToBeDeleted .addAll (
259
+ LongStream .rangeClosed (minMaxGen .v1 (), minMaxGen .v2 ()).boxed ().collect (Collectors .toList ())
260
+ );
283
261
}
284
262
285
263
Map <String , Tuple <Long , Long >> metadataFileNotToBeDeletedGenerationMap = getGenerationForMetadataFiles (metadataFilesNotToBeDeleted );
286
264
TreeSet <Tuple <Long , Long >> pinnedGenerations = getOrderedPinnedMetadataGenerations (metadataFileNotToBeDeletedGenerationMap );
287
- Map < Long , Set <Long >> generationsToBeDeletedToPrimaryTermRangeMap = new HashMap <>();
288
- for (long generation : generationsFromMetadataFilesToBeDeleted . keySet () ) {
265
+ Set <Long > generationsToBeDeleted = new HashSet <>();
266
+ for (long generation : generationsFromMetadataFilesToBeDeleted ) {
289
267
// Check if the generation is not referred by metadata file matching pinned timestamps
290
268
if (generation <= maxGenerationToBeDeleted && isGenerationPinned (generation , pinnedGenerations ) == false ) {
291
- generationsToBeDeletedToPrimaryTermRangeMap .put (
292
- generation ,
293
- getPrimaryTermRange (generationsFromMetadataFilesToBeDeleted .get (generation ), translogTransferManager )
294
- );
269
+ generationsToBeDeleted .add (generation );
295
270
}
296
271
}
297
- return generationsToBeDeletedToPrimaryTermRangeMap ;
272
+ return generationsToBeDeleted ;
298
273
}
299
274
300
- protected Set <Long > getPrimaryTermRange (Set <String > metadataFiles , TranslogTransferManager translogTransferManager ) throws IOException {
301
- Tuple <Long , Long > primaryTermRange = new Tuple <>(Long .MIN_VALUE , Long .MAX_VALUE );
302
- for (String metadataFile : metadataFiles ) {
303
- Tuple <Long , Long > primaryTermRangeForMdFile = getMinMaxPrimaryTermFromMetadataFile (metadataFile , translogTransferManager );
304
- primaryTermRange = new Tuple <>(
305
- Math .max (primaryTermRange .v1 (), primaryTermRangeForMdFile .v1 ()),
306
- Math .min (primaryTermRange .v2 (), primaryTermRangeForMdFile .v2 ())
307
- );
308
- if (primaryTermRange .v1 ().equals (primaryTermRange .v2 ())) {
309
- break ;
310
- }
311
- }
312
- return LongStream .rangeClosed (primaryTermRange .v1 (), primaryTermRange .v2 ()).boxed ().collect (Collectors .toSet ());
275
+ protected List <String > getMetadataFilesToBeDeleted (List <String > metadataFiles ) {
276
+ return getMetadataFilesToBeDeleted (metadataFiles , metadataFilePinnedTimestampMap , logger );
313
277
}
314
278
315
279
// Visible for testing
316
- protected List <String > getMetadataFilesToBeDeleted (List <String > metadataFiles ) {
280
+ protected static List <String > getMetadataFilesToBeDeleted (
281
+ List <String > metadataFiles ,
282
+ Map <Long , String > metadataFilePinnedTimestampMap ,
283
+ Logger logger
284
+ ) {
317
285
Tuple <Long , Set <Long >> pinnedTimestampsState = RemoteStorePinnedTimestampService .getPinnedTimestamps ();
318
286
319
287
// Keep files since last successful run of scheduler
@@ -404,8 +372,94 @@ protected Tuple<Long, Long> getMinMaxTranslogGenerationFromMetadataFile(
404
372
}
405
373
}
406
374
407
- protected Tuple <Long , Long > getMinMaxPrimaryTermFromMetadataFile (String metadataFile , TranslogTransferManager translogTransferManager )
408
- throws IOException {
375
+ private void deleteStaleRemotePrimaryTerms (List <String > metadataFiles ) {
376
+ deleteStaleRemotePrimaryTerms (
377
+ metadataFiles ,
378
+ translogTransferManager ,
379
+ oldFormatMetadataFilePrimaryTermMap ,
380
+ minPrimaryTermInRemote ,
381
+ logger
382
+ );
383
+ }
384
+
385
+ /**
386
+ * This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
387
+ * implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
388
+ * <br>
389
+ * This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
390
+ */
391
+ private static void deleteStaleRemotePrimaryTerms (
392
+ List <String > metadataFiles ,
393
+ TranslogTransferManager translogTransferManager ,
394
+ Map <String , Tuple <Long , Long >> oldFormatMetadataFilePrimaryTermMap ,
395
+ AtomicLong minPrimaryTermInRemote ,
396
+ Logger logger
397
+ ) {
398
+ // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
399
+ // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
400
+ // of older primary term.
401
+ if (metadataFiles .isEmpty ()) {
402
+ logger .trace ("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms" );
403
+ return ;
404
+ }
405
+ Optional <Long > minPrimaryTermFromMetadataFiles = metadataFiles .stream ().map (file -> {
406
+ try {
407
+ return getMinMaxPrimaryTermFromMetadataFile (file , translogTransferManager , oldFormatMetadataFilePrimaryTermMap ).v1 ();
408
+ } catch (IOException e ) {
409
+ return Long .MAX_VALUE ;
410
+ }
411
+ }).min (Long ::compareTo );
412
+ // First we delete all stale primary terms folders from remote store
413
+ long minimumReferencedPrimaryTerm = minPrimaryTermFromMetadataFiles .get () - 1 ;
414
+ Long minPrimaryTerm = getMinPrimaryTermInRemote (minPrimaryTermInRemote , translogTransferManager , logger );
415
+ if (minimumReferencedPrimaryTerm > minPrimaryTerm ) {
416
+ translogTransferManager .deletePrimaryTermsAsync (minimumReferencedPrimaryTerm );
417
+ minPrimaryTermInRemote .set (minimumReferencedPrimaryTerm );
418
+ } else {
419
+ logger .debug (
420
+ "Skipping primary term cleanup. minimumReferencedPrimaryTerm = {}, minPrimaryTermInRemote = {}" ,
421
+ minimumReferencedPrimaryTerm ,
422
+ minPrimaryTermInRemote
423
+ );
424
+ }
425
+ }
426
+
427
+ private static Long getMinPrimaryTermInRemote (
428
+ AtomicLong minPrimaryTermInRemote ,
429
+ TranslogTransferManager translogTransferManager ,
430
+ Logger logger
431
+ ) {
432
+ if (minPrimaryTermInRemote .get () == Long .MAX_VALUE ) {
433
+ CountDownLatch latch = new CountDownLatch (1 );
434
+ translogTransferManager .listPrimaryTermsInRemoteAsync (new LatchedActionListener <>(new ActionListener <>() {
435
+ @ Override
436
+ public void onResponse (Set <Long > primaryTermsInRemote ) {
437
+ Optional <Long > minPrimaryTerm = primaryTermsInRemote .stream ().min (Long ::compareTo );
438
+ minPrimaryTerm .ifPresent (minPrimaryTermInRemote ::set );
439
+ }
440
+
441
+ @ Override
442
+ public void onFailure (Exception e ) {
443
+ logger .error ("Exception while fetching min primary term from remote translog" , e );
444
+ }
445
+ }, latch ));
446
+
447
+ try {
448
+ if (latch .await (5 , TimeUnit .MINUTES ) == false ) {
449
+ logger .error ("Timeout while fetching min primary term from remote translog" );
450
+ }
451
+ } catch (InterruptedException e ) {
452
+ logger .error ("Exception while fetching min primary term from remote translog" , e );
453
+ }
454
+ }
455
+ return minPrimaryTermInRemote .get ();
456
+ }
457
+
458
+ protected static Tuple <Long , Long > getMinMaxPrimaryTermFromMetadataFile (
459
+ String metadataFile ,
460
+ TranslogTransferManager translogTransferManager ,
461
+ Map <String , Tuple <Long , Long >> oldFormatMetadataFilePrimaryTermMap
462
+ ) throws IOException {
409
463
Tuple <Long , Long > minMaxPrimaryTermFromFileName = TranslogTransferMetadata .getMinMaxPrimaryTermFromFilename (metadataFile );
410
464
if (minMaxPrimaryTermFromFileName != null ) {
411
465
return minMaxPrimaryTermFromFileName ;
@@ -434,27 +488,51 @@ protected Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(String metadata
434
488
}
435
489
}
436
490
437
- /**
438
- * This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
439
- * implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
440
- * <br>
441
- * This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
442
- */
443
- private void deleteStaleRemotePrimaryTerms (List <String > metadataFiles ) {
444
- // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
445
- // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
446
- // of older primary term.
447
- if (olderPrimaryCleaned .trySet (Boolean .TRUE )) {
448
- if (metadataFiles .isEmpty ()) {
449
- logger .trace ("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms" );
450
- return ;
491
+ public static void cleanup (TranslogTransferManager translogTransferManager ) throws IOException {
492
+ ActionListener <List <BlobMetadata >> listMetadataFilesListener = new ActionListener <>() {
493
+ @ Override
494
+ public void onResponse (List <BlobMetadata > blobMetadata ) {
495
+ List <String > metadataFiles = blobMetadata .stream ().map (BlobMetadata ::name ).collect (Collectors .toList ());
496
+
497
+ try {
498
+ if (metadataFiles .isEmpty ()) {
499
+ staticLogger .debug ("No stale translog metadata files found" );
500
+ return ;
501
+ }
502
+ List <String > metadataFilesToBeDeleted = getMetadataFilesToBeDeleted (metadataFiles , new HashMap <>(), staticLogger );
503
+ if (metadataFilesToBeDeleted .isEmpty ()) {
504
+ staticLogger .debug ("No metadata files to delete" );
505
+ return ;
506
+ }
507
+ staticLogger .debug (() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted );
508
+
509
+ // For all the files that we are keeping, fetch min and max generations
510
+ List <String > metadataFilesNotToBeDeleted = new ArrayList <>(metadataFiles );
511
+ metadataFilesNotToBeDeleted .removeAll (metadataFilesToBeDeleted );
512
+ staticLogger .debug (() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted );
513
+
514
+ // Delete stale metadata files
515
+ translogTransferManager .deleteMetadataFilesAsync (
516
+ metadataFilesToBeDeleted ,
517
+ // Delete stale primary terms
518
+ () -> deleteStaleRemotePrimaryTerms (
519
+ metadataFilesNotToBeDeleted ,
520
+ translogTransferManager ,
521
+ new HashMap <>(),
522
+ new AtomicLong (Long .MAX_VALUE ),
523
+ staticLogger
524
+ )
525
+ );
526
+ } catch (Exception e ) {
527
+ staticLogger .error ("Exception while cleaning up metadata and primary terms" , e );
528
+ }
451
529
}
452
- Optional < Long > minPrimaryTerm = metadataFiles . stream ()
453
- . map ( file -> RemoteStoreUtils . invertLong ( file . split ( METADATA_SEPARATOR )[ 1 ]))
454
- . min ( Long :: compareTo );
455
- // First we delete all stale primary terms folders from remote store
456
- long minimumReferencedPrimaryTerm = minPrimaryTerm . get () - 1 ;
457
- translogTransferManager . deletePrimaryTermsAsync ( minimumReferencedPrimaryTerm ) ;
458
- }
530
+
531
+ @ Override
532
+ public void onFailure ( Exception e ) {
533
+ staticLogger . error ( "Exception while cleaning up metadata and primary terms" , e );
534
+ }
535
+ } ;
536
+ translogTransferManager . listTranslogMetadataFilesAsync ( listMetadataFilesListener );
459
537
}
460
538
}
0 commit comments