Skip to content

Commit 7710d1c

Browse files
authored
KAFKA-14487: Move LogManager static methods/fields to storage module (#19302)
Move the static fields/methods Reviewers: Luke Chen <[email protected]>
1 parent 8fa0d97 commit 7710d1c

File tree

11 files changed

+301
-234
lines changed

11 files changed

+301
-234
lines changed

Diff for: build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -2230,6 +2230,7 @@ project(':storage') {
22302230
}
22312231

22322232
dependencies {
2233+
implementation project(':metadata')
22332234
implementation project(':storage:storage-api')
22342235
implementation project(':server-common')
22352236
implementation project(':clients')

Diff for: checkstyle/import-control-storage.xml

+2
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@
9494
<allow pkg="com.yammer.metrics.core" />
9595
<allow pkg="org.apache.kafka.common" />
9696
<allow pkg="org.apache.kafka.config" />
97+
<allow pkg="org.apache.kafka.image" />
98+
<allow pkg="org.apache.kafka.metadata" />
9799
<allow pkg="org.apache.kafka.server"/>
98100
<allow pkg="org.apache.kafka.storage.internals"/>
99101
<allow pkg="org.apache.kafka.storage.log.metrics"/>

Diff for: core/src/main/scala/kafka/log/LogManager.scala

+5-68
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import java.nio.file.{Files, NoSuchFileException}
2323
import java.util.concurrent._
2424
import java.util.concurrent.atomic.AtomicInteger
2525
import kafka.server.{KafkaConfig, KafkaRaftServer}
26-
import kafka.server.metadata.BrokerMetadataPublisher.info
2726
import kafka.utils.threadsafe
2827
import kafka.utils.{CoreUtils, Logging, Pool}
2928
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid}
@@ -42,7 +41,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
4241
import java.util.{Collections, Optional, OptionalLong, Properties}
4342
import org.apache.kafka.server.metrics.KafkaMetricsGroup
4443
import org.apache.kafka.server.util.{FileLock, Scheduler}
45-
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, LogConfig, LogDirFailureChannel, LogOffsetsListener, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
44+
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, LogConfig, LogDirFailureChannel, LogManager => JLogManager, LogOffsetsListener, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
4645
import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile}
4746
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
4847

@@ -80,8 +79,6 @@ class LogManager(logDirs: Seq[File],
8079
remoteStorageSystemEnable: Boolean,
8180
val initialTaskDelayMs: Long) extends Logging {
8281

83-
import LogManager._
84-
8582
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
8683

8784
private val logCreationOrDeletionLock = new Object
@@ -127,9 +124,9 @@ class LogManager(logDirs: Seq[File],
127124
def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet
128125

129126
@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
130-
(dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap
127+
(dir, new OffsetCheckpointFile(new File(dir, JLogManager.RECOVERY_POINT_CHECKPOINT_FILE), logDirFailureChannel))).toMap
131128
@volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
132-
(dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
129+
(dir, new OffsetCheckpointFile(new File(dir, JLogManager.LOG_START_OFFSET_CHECKPOINT_FILE), logDirFailureChannel))).toMap
133130

134131
private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()
135132

@@ -261,7 +258,7 @@ class LogManager(logDirs: Seq[File],
261258
private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
262259
dirs.flatMap { dir =>
263260
try {
264-
val lock = new FileLock(new File(dir, LockFileName))
261+
val lock = new FileLock(new File(dir, JLogManager.LOCK_FILE_NAME))
265262
if (!lock.tryLock())
266263
throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent +
267264
". A Kafka instance in another process or thread is using this directory.")
@@ -680,7 +677,7 @@ class LogManager(logDirs: Seq[File],
680677

681678
try {
682679
jobs.foreachEntry { (dir, dirJobs) =>
683-
if (waitForAllToComplete(dirJobs,
680+
if (JLogManager.waitForAllToComplete(dirJobs.toList.asJava,
684681
e => warn(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}"))) {
685682
val logs = logsInDir(localLogsByDir, dir)
686683

@@ -1520,25 +1517,6 @@ class LogManager(logDirs: Seq[File],
15201517
}
15211518

15221519
object LogManager {
1523-
val LockFileName = ".lock"
1524-
1525-
/**
1526-
* Wait all jobs to complete
1527-
* @param jobs jobs
1528-
* @param callback this will be called to handle the exception caused by each Future#get
1529-
* @return true if all pass. Otherwise, false
1530-
*/
1531-
private[log] def waitForAllToComplete(jobs: Seq[Future[_]], callback: Throwable => Unit): Boolean = {
1532-
jobs.count(future => Try(future.get) match {
1533-
case Success(_) => false
1534-
case Failure(e) =>
1535-
callback(e)
1536-
true
1537-
}) == 0
1538-
}
1539-
1540-
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
1541-
val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
15421520

15431521
def apply(config: KafkaConfig,
15441522
initialOfflineDirs: Seq[String],
@@ -1575,45 +1553,4 @@ object LogManager {
15751553
remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled,
15761554
initialTaskDelayMs = config.logInitialTaskDelayMs)
15771555
}
1578-
1579-
/**
1580-
* Returns true if the given log should not be on the current broker
1581-
* according to the metadata image.
1582-
*
1583-
* @param brokerId The ID of the current broker.
1584-
* @param newTopicsImage The new topics image after broker has been reloaded
1585-
* @param log The log object to check
1586-
* @return true if the log should not exist on the broker, false otherwise.
1587-
*/
1588-
def isStrayKraftReplica(
1589-
brokerId: Int,
1590-
newTopicsImage: TopicsImage,
1591-
log: UnifiedLog
1592-
): Boolean = {
1593-
if (log.topicId.isEmpty) {
1594-
// Missing topic ID could result from storage failure or unclean shutdown after topic creation but before flushing
1595-
// data to the `partition.metadata` file. And before appending data to the log, the `partition.metadata` is always
1596-
// flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as
1597-
// a stray log.
1598-
info(s"The topicId does not exist in $log, treat it as a stray log")
1599-
return true
1600-
}
1601-
1602-
val topicId = log.topicId.get
1603-
val partitionId = log.topicPartition.partition()
1604-
Option(newTopicsImage.getPartition(topicId, partitionId)) match {
1605-
case Some(partition) =>
1606-
if (!partition.replicas.contains(brokerId)) {
1607-
info(s"Found stray log dir $log: the current replica assignment ${partition.replicas.mkString("[", ", ", "]")} " +
1608-
s"does not contain the local brokerId $brokerId.")
1609-
true
1610-
} else {
1611-
false
1612-
}
1613-
1614-
case None =>
1615-
info(s"Found stray log dir $log: the topicId $topicId does not exist in the metadata image")
1616-
true
1617-
}
1618-
}
16191556
}

Diff for: core/src/main/scala/kafka/raft/RaftManager.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import java.util.OptionalInt
2424
import java.util.concurrent.CompletableFuture
2525
import java.util.{Map => JMap}
2626
import java.util.{Collection => JCollection}
27-
import kafka.log.LogManager
2827
import kafka.server.KafkaConfig
2928
import kafka.utils.CoreUtils
3029
import kafka.utils.Logging
@@ -48,7 +47,7 @@ import org.apache.kafka.server.common.serialization.RecordSerde
4847
import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
4948
import org.apache.kafka.server.fault.FaultHandler
5049
import org.apache.kafka.server.util.timer.SystemTimer
51-
import org.apache.kafka.storage.internals.log.UnifiedLog
50+
import org.apache.kafka.storage.internals.log.{LogManager, UnifiedLog}
5251

5352
import scala.jdk.CollectionConverters._
5453
import scala.jdk.OptionConverters._
@@ -62,7 +61,7 @@ object KafkaRaftManager {
6261
}
6362

6463
private def lockDataDir(dataDir: File): FileLock = {
65-
val lock = new FileLock(new File(dataDir, LogManager.LockFileName))
64+
val lock = new FileLock(new File(dataDir, LogManager.LOCK_FILE_NAME))
6665

6766
if (!lock.tryLock()) {
6867
throw new KafkaException(

Diff for: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
3434
import org.apache.kafka.metadata.publisher.AclPublisher
3535
import org.apache.kafka.server.common.RequestLocal
3636
import org.apache.kafka.server.fault.FaultHandler
37+
import org.apache.kafka.storage.internals.log.{LogManager => JLogManager}
3738

3839
import java.util.concurrent.CompletableFuture
3940
import scala.collection.mutable
@@ -300,7 +301,7 @@ class BrokerMetadataPublisher(
300301
// recovery-from-unclean-shutdown if required.
301302
logManager.startup(
302303
metadataCache.getAllTopics().asScala,
303-
isStray = log => LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)
304+
isStray = log => JLogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)
304305
)
305306

306307
// Rename all future replicas which are in the same directory as the

0 commit comments

Comments
 (0)