Skip to content

Commit 50fb993

Browse files
authored
KAFKA-19136 Move metadata-related configs from KRaftConfigs to MetadataLogConfig (#19465)
Separates metadata-related configurations from the `KRaftConfigs` into the `MetadataLogConfig` class. Previously, metadata-related configs were placed in `KRaftConfigs`, which mixed server-related configs (like process.roles) with metadata-specific ones (like metadata.log.*), leading to confusion and tight coupling. In this PR: - Extract metadata-related config definitions and variables from `KRaftConfig` into `MetadataLogConfig`. - Move `node.id` out of `MetadataLogConfig` into `KafkaMetadataLog’s constructor` to avoid redundant config references. - Leave server-related configurations in `KRaftConfig`, consistent with its role. This separation makes `KafkaConfig` and `KRaftConfig` cleaner, and aligns with the goal of having a dedicated MetadataLogConfig class for managing metadata-specific configurations. Reviewers: PoAn Yang <[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent db62c7c commit 50fb993

File tree

22 files changed

+232
-184
lines changed

22 files changed

+232
-184
lines changed

build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -2416,6 +2416,7 @@ project(':tools') {
24162416
implementation project(':group-coordinator')
24172417
implementation project(':coordinator-common')
24182418
implementation project(':share-coordinator')
2419+
implementation project(':raft')
24192420
implementation libs.argparse4j
24202421
implementation libs.jacksonDatabind
24212422
implementation libs.jacksonDataformatCsv

checkstyle/import-control.xml

+1
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@
295295
<allow pkg="org.apache.kafka.metadata" />
296296
<allow pkg="org.apache.kafka.metadata.properties" />
297297
<allow pkg="org.apache.kafka.network" />
298+
<allow pkg="org.apache.kafka.raft" />
298299
<allow pkg="org.apache.kafka.server.util" />
299300
<allow pkg="kafka.admin" />
300301
<allow pkg="kafka.server" />

core/src/main/scala/kafka/raft/KafkaMetadataLog.scala

+10-7
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.kafka.common.record.{MemoryRecords, Records}
2929
import org.apache.kafka.common.utils.{Time, Utils}
3030
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
3131
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
32-
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
32+
import org.apache.kafka.server.config.ServerLogConfigs
3333
import org.apache.kafka.server.storage.log.FetchIsolation
3434
import org.apache.kafka.server.util.Scheduler
3535
import org.apache.kafka.snapshot.FileRawSnapshotReader
@@ -58,10 +58,11 @@ final class KafkaMetadataLog private (
5858
// polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
5959
snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
6060
topicPartition: TopicPartition,
61-
config: MetadataLogConfig
61+
config: MetadataLogConfig,
62+
nodeId: Int
6263
) extends ReplicatedLog with Logging {
6364

64-
this.logIdent = s"[MetadataLog partition=$topicPartition, nodeId=${config.nodeId}] "
65+
this.logIdent = s"[MetadataLog partition=$topicPartition, nodeId=$nodeId] "
6566

6667
override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo = {
6768
val isolation = readIsolation match {
@@ -581,7 +582,8 @@ object KafkaMetadataLog extends Logging {
581582
dataDir: File,
582583
time: Time,
583584
scheduler: Scheduler,
584-
config: MetadataLogConfig
585+
config: MetadataLogConfig,
586+
nodeId: Int
585587
): KafkaMetadataLog = {
586588
val props = new Properties()
587589
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.maxBatchSizeInBytes.toString)
@@ -597,7 +599,7 @@ object KafkaMetadataLog extends Logging {
597599

598600
if (config.logSegmentBytes < config.logSegmentMinBytes) {
599601
throw new InvalidConfigurationException(
600-
s"Cannot set ${KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG} below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
602+
s"Cannot set ${MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG} below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
601603
)
602604
} else if (defaultLogConfig.retentionMs >= 0) {
603605
throw new InvalidConfigurationException(
@@ -631,12 +633,13 @@ object KafkaMetadataLog extends Logging {
631633
scheduler,
632634
recoverSnapshots(log),
633635
topicPartition,
634-
config
636+
config,
637+
nodeId
635638
)
636639

637640
// Print a warning if users have overridden the internal config
638641
if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) {
639-
metadataLog.error(s"Overriding ${KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG} is only supported for testing. Setting " +
642+
metadataLog.error(s"Overriding ${MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG} is only supported for testing. Setting " +
640643
s"this value too low may lead to an inability to write batches of metadata records.")
641644
}
642645

core/src/main/scala/kafka/raft/RaftManager.scala

+2-10
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateSt
4545
import org.apache.kafka.server.ProcessRole
4646
import org.apache.kafka.server.common.Feature
4747
import org.apache.kafka.server.common.serialization.RecordSerde
48-
import org.apache.kafka.server.config.ServerLogConfigs
4948
import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
5049
import org.apache.kafka.server.fault.FaultHandler
5150
import org.apache.kafka.server.util.timer.SystemTimer
@@ -231,15 +230,8 @@ class KafkaRaftManager[T](
231230
dataDir,
232231
time,
233232
scheduler,
234-
config = new MetadataLogConfig(config.metadataLogSegmentBytes,
235-
config.metadataLogSegmentMinBytes,
236-
config.metadataLogSegmentMillis,
237-
config.metadataRetentionBytes,
238-
config.metadataRetentionMillis,
239-
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
240-
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
241-
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
242-
config.metadataNodeIDConfig)
233+
config = new MetadataLogConfig(config),
234+
config.nodeId
243235
)
244236
}
245237

core/src/main/scala/kafka/server/KafkaConfig.scala

+5-11
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
4040
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
4141
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
4242
import org.apache.kafka.network.SocketServerConfigs
43-
import org.apache.kafka.raft.QuorumConfig
43+
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
4444
import org.apache.kafka.security.authorizer.AuthorizerUtils
4545
import org.apache.kafka.server.ProcessRole
4646
import org.apache.kafka.server.authorizer.Authorizer
@@ -239,18 +239,12 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
239239
}
240240

241241
def metadataLogDir: String = {
242-
Option(getString(KRaftConfigs.METADATA_LOG_DIR_CONFIG)) match {
242+
Option(getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) match {
243243
case Some(dir) => dir
244244
case None => logDirs.head
245245
}
246246
}
247247

248-
def metadataLogSegmentBytes = getInt(KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG)
249-
def metadataLogSegmentMillis = getLong(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG)
250-
def metadataRetentionBytes = getLong(KRaftConfigs.METADATA_MAX_RETENTION_BYTES_CONFIG)
251-
def metadataRetentionMillis = getLong(KRaftConfigs.METADATA_MAX_RETENTION_MILLIS_CONFIG)
252-
def metadataNodeIDConfig = getInt(KRaftConfigs.NODE_ID_CONFIG)
253-
def metadataLogSegmentMinBytes = getInt(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG)
254248
val serverMaxStartupTimeMs = getLong(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG)
255249

256250
def messageMaxBytes = getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG)
@@ -264,10 +258,10 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
264258
}
265259

266260
/************* Metadata Configuration ***********/
267-
val metadataSnapshotMaxNewRecordBytes = getLong(KRaftConfigs.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG)
268-
val metadataSnapshotMaxIntervalMs = getLong(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG)
261+
val metadataSnapshotMaxNewRecordBytes = getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG)
262+
val metadataSnapshotMaxIntervalMs = getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG)
269263
val metadataMaxIdleIntervalNs: Option[Long] = {
270-
val value = TimeUnit.NANOSECONDS.convert(getInt(KRaftConfigs.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG).toLong, TimeUnit.MILLISECONDS)
264+
val value = TimeUnit.NANOSECONDS.convert(getInt(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG).toLong, TimeUnit.MILLISECONDS)
271265
if (value > 0) Some(value) else None
272266
}
273267

core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
3232
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
3333
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
3434
import org.apache.kafka.network.SocketServerConfigs
35+
import org.apache.kafka.raft.MetadataLogConfig
3536
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs}
3637
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
3738

@@ -74,7 +75,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
7475
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share"))
7576
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"))
7677
}
77-
cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath))
78+
cfgs.foreach(_.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath))
7879
insertControllerListenersIfNeeded(cfgs)
7980
cfgs.map(KafkaConfig.fromProps)
8081
}

core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
5656
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
5757
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
5858
import org.apache.kafka.network.SocketServerConfigs
59-
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
59+
import org.apache.kafka.raft.MetadataLogConfig
60+
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
6061
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
6162
import org.apache.kafka.server.record.BrokerCompressionType
6263
import org.apache.kafka.server.util.ShutdownableThread
@@ -1099,7 +1100,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
10991100
def testServersCanStartWithInvalidStaticConfigsAndValidDynamicConfigs(groupProtocol: String): Unit = {
11001101
// modify snapshot interval config to explicitly take snapshot on a broker with valid dynamic configs
11011102
val props = defaultStaticConfig(numServers)
1102-
props.put(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "10000")
1103+
props.put(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "10000")
11031104

11041105
val kafkaConfig = KafkaConfig.fromProps(props)
11051106
val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer]

core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
3838
import org.apache.kafka.metadata.storage.Formatter
3939
import org.apache.kafka.network.SocketServerConfigs
4040
import org.apache.kafka.queue.KafkaEventQueue
41-
import org.apache.kafka.raft.QuorumConfig
41+
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
4242
import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
4343
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVersion, TransactionVersion}
4444
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
@@ -261,7 +261,7 @@ abstract class QuorumTestHarness extends Logging {
261261
}
262262
val nodeId = Integer.parseInt(props.getProperty(KRaftConfigs.NODE_ID_CONFIG))
263263
val metadataDir = TestUtils.tempDir()
264-
props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
264+
props.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
265265
val proto = controllerListenerSecurityProtocol.toString
266266
val securityProtocolMaps = extraControllerSecurityProtocols().map(sc => sc + ":" + sc).mkString(",")
267267
val listeners = extraControllerSecurityProtocols().map(sc => sc + "://localhost:0").mkString(",")

core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import kafka.utils.TestUtils
2121
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
2222
import org.apache.kafka.common.utils.BufferSupplier
2323
import org.apache.kafka.metadata.MetadataRecordSerde
24-
import org.apache.kafka.server.config.KRaftConfigs
24+
import org.apache.kafka.raft.MetadataLogConfig
2525
import org.apache.kafka.snapshot.RecordsSnapshotReader
2626
import org.junit.jupiter.api.Assertions.assertEquals
2727
import org.junit.jupiter.api.Assertions.assertNotEquals
@@ -48,8 +48,8 @@ class RaftClusterSnapshotTest {
4848
.setNumControllerNodes(numberOfControllers)
4949
.build()
5050
)
51-
.setConfigProp(KRaftConfigs.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, "10")
52-
.setConfigProp(KRaftConfigs.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, "0")
51+
.setConfigProp(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, "10")
52+
.setConfigProp(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, "0")
5353
.build()
5454
) { cluster =>
5555
cluster.format()

core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala

+13-36
Original file line numberDiff line numberDiff line change
@@ -76,35 +76,17 @@ final class KafkaMetadataLogTest {
7676
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
7777
props.put(KRaftConfigs.NODE_ID_CONFIG, Int.box(2))
7878
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
79-
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10240))
80-
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 * 1024))
79+
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10240))
80+
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 * 1024))
8181
assertThrows(classOf[InvalidConfigurationException], () => {
8282
val kafkaConfig = KafkaConfig.fromProps(props)
83-
val metadataConfig = new MetadataLogConfig(
84-
kafkaConfig.metadataLogSegmentBytes,
85-
kafkaConfig.metadataLogSegmentMinBytes,
86-
kafkaConfig.metadataLogSegmentMillis,
87-
kafkaConfig.metadataRetentionBytes,
88-
kafkaConfig.metadataRetentionMillis,
89-
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
90-
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
91-
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
92-
kafkaConfig.metadataNodeIDConfig)
83+
val metadataConfig = new MetadataLogConfig(kafkaConfig)
9384
buildMetadataLog(tempDir, mockTime, metadataConfig)
9485
})
9586

96-
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, Int.box(10240))
87+
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, Int.box(10240))
9788
val kafkaConfig = KafkaConfig.fromProps(props)
98-
val metadataConfig = new MetadataLogConfig(
99-
kafkaConfig.metadataLogSegmentBytes,
100-
kafkaConfig.metadataLogSegmentMinBytes,
101-
kafkaConfig.metadataLogSegmentMillis,
102-
kafkaConfig.metadataRetentionBytes,
103-
kafkaConfig.metadataRetentionMillis,
104-
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
105-
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
106-
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
107-
kafkaConfig.metadataNodeIDConfig)
89+
val metadataConfig = new MetadataLogConfig(kafkaConfig)
10890
buildMetadataLog(tempDir, mockTime, metadataConfig)
10991
}
11092

@@ -713,8 +695,7 @@ final class KafkaMetadataLogTest {
713695
DefaultMetadataLogConfig.retentionMillis,
714696
maxBatchSizeInBytes,
715697
DefaultMetadataLogConfig.maxFetchSizeInBytes,
716-
DefaultMetadataLogConfig.deleteDelayMillis,
717-
DefaultMetadataLogConfig.nodeId
698+
DefaultMetadataLogConfig.deleteDelayMillis
718699
)
719700
val log = buildMetadataLog(tempDir, mockTime, config)
720701

@@ -934,8 +915,7 @@ final class KafkaMetadataLogTest {
934915
60 * 1000,
935916
512,
936917
DefaultMetadataLogConfig.maxFetchSizeInBytes,
937-
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
938-
1
918+
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
939919
)
940920
val log = buildMetadataLog(tempDir, mockTime, config)
941921

@@ -972,8 +952,7 @@ final class KafkaMetadataLogTest {
972952
60 * 1000,
973953
100,
974954
DefaultMetadataLogConfig.maxBatchSizeInBytes,
975-
DefaultMetadataLogConfig.maxFetchSizeInBytes,
976-
DefaultMetadataLogConfig.nodeId
955+
DefaultMetadataLogConfig.maxFetchSizeInBytes
977956
)
978957
val log = buildMetadataLog(tempDir, mockTime, config)
979958

@@ -1007,8 +986,7 @@ final class KafkaMetadataLogTest {
1007986
60 * 1000,
1008987
100,
1009988
DefaultMetadataLogConfig.maxFetchSizeInBytes,
1010-
DefaultMetadataLogConfig.deleteDelayMillis,
1011-
DefaultMetadataLogConfig.nodeId
989+
DefaultMetadataLogConfig.deleteDelayMillis
1012990
)
1013991
val log = buildMetadataLog(tempDir, mockTime, config)
1014992

@@ -1052,8 +1030,7 @@ final class KafkaMetadataLogTest {
10521030
60 * 1000,
10531031
200,
10541032
DefaultMetadataLogConfig.maxFetchSizeInBytes,
1055-
DefaultMetadataLogConfig.deleteDelayMillis,
1056-
DefaultMetadataLogConfig.nodeId
1033+
DefaultMetadataLogConfig.deleteDelayMillis
10571034
)
10581035
val log = buildMetadataLog(tempDir, mockTime, config)
10591036

@@ -1112,8 +1089,7 @@ object KafkaMetadataLogTest {
11121089
60 * 1000,
11131090
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
11141091
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
1115-
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
1116-
1
1092+
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
11171093
)
11181094

11191095
def buildMetadataLogAndDir(
@@ -1133,7 +1109,8 @@ object KafkaMetadataLogTest {
11331109
logDir,
11341110
time,
11351111
time.scheduler,
1136-
metadataLogConfig
1112+
metadataLogConfig,
1113+
1
11371114
)
11381115

11391116
(logDir.toPath, metadataLog, metadataLogConfig)

core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse
3333
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
3434
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3535
import org.apache.kafka.server.common.RequestLocal
36-
import org.apache.kafka.server.config.KRaftConfigs
3736
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
3837
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManager, RemoteLogManagerConfig}
3938
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -57,6 +56,7 @@ import org.mockito.Mockito.{doAnswer, doThrow, spy}
5756
import net.jqwik.api.AfterFailureMode
5857
import net.jqwik.api.ForAll
5958
import net.jqwik.api.Property
59+
import org.apache.kafka.server.config.KRaftConfigs
6060

6161
import java.io._
6262
import java.nio.ByteBuffer

core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ import org.apache.kafka.common.Uuid
3131
import org.apache.kafka.common.metrics.Metrics
3232
import org.apache.kafka.common.utils.Time
3333
import org.apache.kafka.network.SocketServerConfigs
34-
import org.apache.kafka.raft.Endpoints
35-
import org.apache.kafka.raft.QuorumConfig
34+
import org.apache.kafka.raft.{Endpoints, MetadataLogConfig, QuorumConfig}
3635
import org.apache.kafka.server.ProcessRole
3736
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
3837
import org.apache.kafka.server.fault.FaultHandler
@@ -58,7 +57,7 @@ class RaftManagerTest {
5857
props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, value.toString)
5958
}
6059
metadataDir.foreach { value =>
61-
props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, value.toString)
60+
props.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, value.toString)
6261
}
6362
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, processRoles.mkString(","))
6463
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)

core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.kafka.common.internals.Plugin
3131
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
3232
import org.apache.kafka.common.network.ListenerName
3333
import org.apache.kafka.common.security.auth.SecurityProtocol
34-
import org.apache.kafka.raft.QuorumConfig
34+
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
3535
import org.apache.kafka.network.SocketServerConfigs
3636
import org.apache.kafka.server.DynamicThreadPool
3737
import org.apache.kafka.server.authorizer._
@@ -673,11 +673,11 @@ class DynamicBrokerConfigTest {
673673
@Test
674674
def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = {
675675
val props = TestUtils.createBrokerConfig(0, port = 8181)
676-
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, "1024")
676+
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, "1024")
677677
val config = new KafkaConfig(props)
678-
assertFalse(config.nonInternalValues.containsKey(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
678+
assertFalse(config.nonInternalValues.containsKey(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
679679
config.updateCurrentConfig(new KafkaConfig(props))
680-
assertFalse(config.nonInternalValues.containsKey(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
680+
assertFalse(config.nonInternalValues.containsKey(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
681681
}
682682

683683
@Test

0 commit comments

Comments
 (0)