@@ -42,11 +42,12 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
42
42
import java .util .{Collections , Optional , OptionalLong , Properties }
43
43
import org .apache .kafka .server .metrics .KafkaMetricsGroup
44
44
import org .apache .kafka .server .util .{FileLock , Scheduler }
45
- import org .apache .kafka .storage .internals .log .{CleanerConfig , LogConfig , LogDirFailureChannel , LogOffsetsListener , ProducerStateManagerConfig , RemoteIndexCache , UnifiedLog }
45
+ import org .apache .kafka .storage .internals .log .{CleanerConfig , LogCleaner , LogConfig , LogDirFailureChannel , LogOffsetsListener , ProducerStateManagerConfig , RemoteIndexCache , UnifiedLog }
46
46
import org .apache .kafka .storage .internals .checkpoint .{CleanShutdownFileHandler , OffsetCheckpointFile }
47
47
import org .apache .kafka .storage .log .metrics .BrokerTopicStats
48
48
49
49
import java .util
50
+ import java .util .stream .Collectors
50
51
51
52
/**
52
53
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
@@ -629,7 +630,7 @@ class LogManager(logDirs: Seq[File],
629
630
initialTaskDelayMs)
630
631
}
631
632
if (cleanerConfig.enableCleaner) {
632
- _cleaner = new LogCleaner (cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
633
+ _cleaner = new LogCleaner (cleanerConfig, liveLogDirs.asJava , currentLogs, logDirFailureChannel, time)
633
634
_cleaner.startup()
634
635
}
635
636
}
@@ -894,7 +895,7 @@ class LogManager(logDirs: Seq[File],
894
895
*/
895
896
private def resumeCleaning (topicPartition : TopicPartition ): Unit = {
896
897
if (cleaner != null ) {
897
- cleaner.resumeCleaning(Seq (topicPartition))
898
+ cleaner.resumeCleaning(util. Set .of (topicPartition))
898
899
info(s " Cleaning for partition $topicPartition is resumed " )
899
900
}
900
901
}
@@ -1286,7 +1287,7 @@ class LogManager(logDirs: Seq[File],
1286
1287
if (cleaner != null && ! isFuture) {
1287
1288
cleaner.abortCleaning(topicPartition)
1288
1289
if (checkpoint) {
1289
- cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option (topicPartition))
1290
+ cleaner.updateCheckpoints(removedLog.parentDirFile, Optional .of (topicPartition))
1290
1291
}
1291
1292
}
1292
1293
if (isStray) {
@@ -1344,7 +1345,7 @@ class LogManager(logDirs: Seq[File],
1344
1345
1345
1346
val logsByDirCached = logsByDir
1346
1347
logDirs.foreach { logDir =>
1347
- if (cleaner != null ) cleaner.updateCheckpoints(logDir)
1348
+ if (cleaner != null ) cleaner.updateCheckpoints(logDir, Optional .empty() )
1348
1349
val logsToCheckpoint = logsInDir(logsByDirCached, logDir)
1349
1350
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
1350
1351
checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
@@ -1382,19 +1383,22 @@ class LogManager(logDirs: Seq[File],
1382
1383
val startMs = time.milliseconds
1383
1384
1384
1385
// clean current logs.
1385
- val deletableLogs = {
1386
+ val deletableLogs : util. Map [ TopicPartition , UnifiedLog ] = {
1386
1387
if (cleaner != null ) {
1387
1388
// prevent cleaner from working on same partitions when changing cleanup policy
1388
1389
cleaner.pauseCleaningForNonCompactedPartitions()
1389
1390
} else {
1390
- currentLogs.asScala.filter {
1391
- case (_, log) => ! log.config.compact
1392
- }
1391
+ currentLogs.entrySet().stream()
1392
+ .filter(e => ! e.getValue.config.compact)
1393
+ .collect(Collectors .toMap(
1394
+ (e : util.Map .Entry [TopicPartition , UnifiedLog ]) => e.getKey,
1395
+ (e : util.Map .Entry [TopicPartition , UnifiedLog ]) => e.getValue
1396
+ ))
1393
1397
}
1394
1398
}
1395
1399
1396
1400
try {
1397
- deletableLogs.foreach {
1401
+ deletableLogs.forEach {
1398
1402
case (topicPartition, log) =>
1399
1403
debug(s " Garbage collecting ' ${log.name}' " )
1400
1404
total += log.deleteOldSegments()
@@ -1408,7 +1412,7 @@ class LogManager(logDirs: Seq[File],
1408
1412
}
1409
1413
} finally {
1410
1414
if (cleaner != null ) {
1411
- cleaner.resumeCleaning(deletableLogs.map(_._1 ))
1415
+ cleaner.resumeCleaning(deletableLogs.keySet( ))
1412
1416
}
1413
1417
}
1414
1418
@@ -1548,7 +1552,7 @@ object LogManager {
1548
1552
LogConfig .validateBrokerLogConfigValues(defaultProps, config.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
1549
1553
val defaultLogConfig = new LogConfig (defaultProps)
1550
1554
1551
- val cleanerConfig = LogCleaner .cleanerConfig (config)
1555
+ val cleanerConfig = new CleanerConfig (config)
1552
1556
val transactionLogConfig = new TransactionLogConfig (config)
1553
1557
1554
1558
new LogManager (logDirs = config.logDirs.map(new File (_).getAbsoluteFile),
0 commit comments