Skip to content

Commit 121ec2a

Browse files
authored
KAFKA-15599 Move MetadataLogConfig to raft module (#19246)
Rewrite the class in Java and move it to the raft module. Reviewers: PoAn Yang <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 1c582a4 commit 121ec2a

File tree

7 files changed

+154
-115
lines changed

7 files changed

+154
-115
lines changed

checkstyle/import-control.xml

+1
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@
488488
<allow pkg="org.apache.kafka.common.requests" />
489489
<allow pkg="org.apache.kafka.server.common" />
490490
<allow pkg="org.apache.kafka.server.common.serialization" />
491+
<allow pkg="org.apache.kafka.server.config" />
491492
<allow pkg="org.apache.kafka.server.fault"/>
492493
<allow pkg="org.apache.kafka.server.util" />
493494
<allow pkg="org.apache.kafka.test"/>

core/src/main/scala/kafka/MetadataLogConfig.scala

-48
This file was deleted.

core/src/main/scala/kafka/raft/KafkaMetadataLog.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
2828
import org.apache.kafka.common.record.{MemoryRecords, Records}
2929
import org.apache.kafka.common.utils.{Time, Utils}
3030
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
31-
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
31+
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
3232
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
3333
import org.apache.kafka.server.storage.log.FetchIsolation
3434
import org.apache.kafka.server.util.Scheduler
@@ -554,7 +554,7 @@ final class KafkaMetadataLog private (
554554
scheduler.scheduleOnce(
555555
"delete-snapshot-files",
556556
() => KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots, this),
557-
config.fileDeleteDelayMs
557+
config.deleteDelayMillis
558558
)
559559
}
560560
}

core/src/main/scala/kafka/raft/RaftManager.scala

+11-2
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ import org.apache.kafka.common.requests.RequestHeader
4141
import org.apache.kafka.common.security.JaasContext
4242
import org.apache.kafka.common.security.auth.SecurityProtocol
4343
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
44-
import org.apache.kafka.raft.{ExternalKRaftMetrics, Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog, TimingWheelExpirationService}
44+
import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, MetadataLogConfig, QuorumConfig, RaftClient, ReplicatedLog, TimingWheelExpirationService}
4545
import org.apache.kafka.server.ProcessRole
4646
import org.apache.kafka.server.common.Feature
4747
import org.apache.kafka.server.common.serialization.RecordSerde
48+
import org.apache.kafka.server.config.ServerLogConfigs
4849
import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
4950
import org.apache.kafka.server.fault.FaultHandler
5051
import org.apache.kafka.server.util.timer.SystemTimer
@@ -230,7 +231,15 @@ class KafkaRaftManager[T](
230231
dataDir,
231232
time,
232233
scheduler,
233-
config = MetadataLogConfig(config, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
234+
config = new MetadataLogConfig(config.metadataLogSegmentBytes,
235+
config.metadataLogSegmentMinBytes,
236+
config.metadataLogSegmentMillis,
237+
config.metadataRetentionBytes,
238+
config.metadataRetentionMillis,
239+
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
240+
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
241+
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
242+
config.metadataNodeIDConfig)
234243
)
235244
}
236245

core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala

+88-51
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.kafka.common.record.ArbitraryMemoryRecords
2727
import org.apache.kafka.common.record.InvalidMemoryRecordsProvider
2828
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
2929
import org.apache.kafka.common.utils.Utils
30-
import org.apache.kafka.raft._
30+
import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, QuorumConfig, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
3131
import org.apache.kafka.raft.internals.BatchBuilder
3232
import org.apache.kafka.server.common.serialization.RecordSerde
3333
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
@@ -80,13 +80,31 @@ final class KafkaMetadataLogTest {
8080
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 * 1024))
8181
assertThrows(classOf[InvalidConfigurationException], () => {
8282
val kafkaConfig = KafkaConfig.fromProps(props)
83-
val metadataConfig = MetadataLogConfig(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
83+
val metadataConfig = new MetadataLogConfig(
84+
kafkaConfig.metadataLogSegmentBytes,
85+
kafkaConfig.metadataLogSegmentMinBytes,
86+
kafkaConfig.metadataLogSegmentMillis,
87+
kafkaConfig.metadataRetentionBytes,
88+
kafkaConfig.metadataRetentionMillis,
89+
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
90+
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
91+
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
92+
kafkaConfig.metadataNodeIDConfig)
8493
buildMetadataLog(tempDir, mockTime, metadataConfig)
8594
})
8695

8796
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, Int.box(10240))
8897
val kafkaConfig = KafkaConfig.fromProps(props)
89-
val metadataConfig = MetadataLogConfig(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
98+
val metadataConfig = new MetadataLogConfig(
99+
kafkaConfig.metadataLogSegmentBytes,
100+
kafkaConfig.metadataLogSegmentMinBytes,
101+
kafkaConfig.metadataLogSegmentMillis,
102+
kafkaConfig.metadataRetentionBytes,
103+
kafkaConfig.metadataRetentionMillis,
104+
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
105+
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
106+
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
107+
kafkaConfig.metadataNodeIDConfig)
90108
buildMetadataLog(tempDir, mockTime, metadataConfig)
91109
}
92110

@@ -129,8 +147,8 @@ final class KafkaMetadataLogTest {
129147
def testEmptyAppendNotAllowed(): Unit = {
130148
val log = buildMetadataLog(tempDir, mockTime)
131149

132-
assertThrows(classOf[IllegalArgumentException], () => log.appendAsFollower(MemoryRecords.EMPTY, 1));
133-
assertThrows(classOf[IllegalArgumentException], () => log.appendAsLeader(MemoryRecords.EMPTY, 1));
150+
assertThrows(classOf[IllegalArgumentException], () => log.appendAsFollower(MemoryRecords.EMPTY, 1))
151+
assertThrows(classOf[IllegalArgumentException], () => log.appendAsLeader(MemoryRecords.EMPTY, 1))
134152
}
135153

136154
@ParameterizedTest
@@ -140,7 +158,7 @@ final class KafkaMetadataLogTest {
140158
val previousEndOffset = log.endOffset().offset()
141159

142160
val action: Executable = () => log.appendAsFollower(records, Int.MaxValue)
143-
if (expectedException.isPresent()) {
161+
if (expectedException.isPresent) {
144162
assertThrows(expectedException.get, action)
145163
} else {
146164
assertThrows(classOf[CorruptRecordException], action)
@@ -478,7 +496,7 @@ final class KafkaMetadataLogTest {
478496
assertEquals(log.earliestSnapshotId(), log.latestSnapshotId())
479497
log.close()
480498

481-
mockTime.sleep(config.fileDeleteDelayMs)
499+
mockTime.sleep(config.deleteDelayMillis)
482500
// Assert that the log dir doesn't contain any older snapshots
483501
Files
484502
.walk(logDir, 1)
@@ -649,7 +667,7 @@ final class KafkaMetadataLogTest {
649667
assertEquals(greaterSnapshotId, secondLog.latestSnapshotId().get)
650668
assertEquals(3 * numberOfRecords, secondLog.startOffset)
651669
assertEquals(epoch, secondLog.lastFetchedEpoch)
652-
mockTime.sleep(config.fileDeleteDelayMs)
670+
mockTime.sleep(config.deleteDelayMillis)
653671

654672
// Assert that the log dir doesn't contain any older snapshots
655673
Files
@@ -687,7 +705,18 @@ final class KafkaMetadataLogTest {
687705
val leaderEpoch = 5
688706
val maxBatchSizeInBytes = 16384
689707
val recordSize = 64
690-
val log = buildMetadataLog(tempDir, mockTime, DefaultMetadataLogConfig.copy(maxBatchSizeInBytes = maxBatchSizeInBytes))
708+
val config = new MetadataLogConfig(
709+
DefaultMetadataLogConfig.logSegmentBytes,
710+
DefaultMetadataLogConfig.logSegmentMinBytes,
711+
DefaultMetadataLogConfig.logSegmentMillis,
712+
DefaultMetadataLogConfig.retentionMaxBytes,
713+
DefaultMetadataLogConfig.retentionMillis,
714+
maxBatchSizeInBytes,
715+
DefaultMetadataLogConfig.maxFetchSizeInBytes,
716+
DefaultMetadataLogConfig.deleteDelayMillis,
717+
DefaultMetadataLogConfig.nodeId
718+
)
719+
val log = buildMetadataLog(tempDir, mockTime, config)
691720

692721
val oversizeBatch = buildFullBatch(leaderEpoch, recordSize, maxBatchSizeInBytes + recordSize)
693722
assertThrows(classOf[RecordTooLargeException], () => {
@@ -897,18 +926,17 @@ final class KafkaMetadataLogTest {
897926

898927
@Test
899928
def testAdvanceLogStartOffsetAfterCleaning(): Unit = {
900-
val config = MetadataLogConfig(
901-
logSegmentBytes = 512,
902-
logSegmentMinBytes = 512,
903-
logSegmentMillis = 10 * 1000,
904-
retentionMaxBytes = 256,
905-
retentionMillis = 60 * 1000,
906-
maxBatchSizeInBytes = 512,
907-
maxFetchSizeInBytes = DefaultMetadataLogConfig.maxFetchSizeInBytes,
908-
fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
909-
nodeId = 1
929+
val config = new MetadataLogConfig(
930+
512,
931+
512,
932+
10 * 1000,
933+
256,
934+
60 * 1000,
935+
512,
936+
DefaultMetadataLogConfig.maxFetchSizeInBytes,
937+
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
938+
1
910939
)
911-
config.copy()
912940
val log = buildMetadataLog(tempDir, mockTime, config)
913941

914942
// Generate some segments
@@ -936,13 +964,16 @@ final class KafkaMetadataLogTest {
936964
@Test
937965
def testDeleteSnapshots(): Unit = {
938966
// Generate some logs and a few snapshots, set retention low and verify that cleaning occurs
939-
val config = DefaultMetadataLogConfig.copy(
940-
logSegmentBytes = 1024,
941-
logSegmentMinBytes = 1024,
942-
logSegmentMillis = 10 * 1000,
943-
retentionMaxBytes = 1024,
944-
retentionMillis = 60 * 1000,
945-
maxBatchSizeInBytes = 100
967+
val config = new MetadataLogConfig(
968+
1024,
969+
1024,
970+
10 * 1000,
971+
1024,
972+
60 * 1000,
973+
100,
974+
DefaultMetadataLogConfig.maxBatchSizeInBytes,
975+
DefaultMetadataLogConfig.maxFetchSizeInBytes,
976+
DefaultMetadataLogConfig.nodeId
946977
)
947978
val log = buildMetadataLog(tempDir, mockTime, config)
948979

@@ -968,13 +999,16 @@ final class KafkaMetadataLogTest {
968999
@Test
9691000
def testSoftRetentionLimit(): Unit = {
9701001
// Set retention equal to the segment size and generate slightly more than one segment of logs
971-
val config = DefaultMetadataLogConfig.copy(
972-
logSegmentBytes = 10240,
973-
logSegmentMinBytes = 10240,
974-
logSegmentMillis = 10 * 1000,
975-
retentionMaxBytes = 10240,
976-
retentionMillis = 60 * 1000,
977-
maxBatchSizeInBytes = 100
1002+
val config = new MetadataLogConfig(
1003+
10240,
1004+
10240,
1005+
10 * 1000,
1006+
10240,
1007+
60 * 1000,
1008+
100,
1009+
DefaultMetadataLogConfig.maxFetchSizeInBytes,
1010+
DefaultMetadataLogConfig.deleteDelayMillis,
1011+
DefaultMetadataLogConfig.nodeId
9781012
)
9791013
val log = buildMetadataLog(tempDir, mockTime, config)
9801014

@@ -1010,13 +1044,16 @@ final class KafkaMetadataLogTest {
10101044

10111045
@Test
10121046
def testSegmentsLessThanLatestSnapshot(): Unit = {
1013-
val config = DefaultMetadataLogConfig.copy(
1014-
logSegmentBytes = 10240,
1015-
logSegmentMinBytes = 10240,
1016-
logSegmentMillis = 10 * 1000,
1017-
retentionMaxBytes = 10240,
1018-
retentionMillis = 60 * 1000,
1019-
maxBatchSizeInBytes = 200
1047+
val config = new MetadataLogConfig(
1048+
10240,
1049+
10240,
1050+
10 * 1000,
1051+
10240,
1052+
60 * 1000,
1053+
200,
1054+
DefaultMetadataLogConfig.maxFetchSizeInBytes,
1055+
DefaultMetadataLogConfig.deleteDelayMillis,
1056+
DefaultMetadataLogConfig.nodeId
10201057
)
10211058
val log = buildMetadataLog(tempDir, mockTime, config)
10221059

@@ -1067,16 +1104,16 @@ object KafkaMetadataLogTest {
10671104
override def read(input: protocol.Readable, size: Int): Array[Byte] = input.readArray(size)
10681105
}
10691106

1070-
val DefaultMetadataLogConfig = MetadataLogConfig(
1071-
logSegmentBytes = 100 * 1024,
1072-
logSegmentMinBytes = 100 * 1024,
1073-
logSegmentMillis = 10 * 1000,
1074-
retentionMaxBytes = 100 * 1024,
1075-
retentionMillis = 60 * 1000,
1076-
maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
1077-
maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
1078-
fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
1079-
nodeId = 1
1107+
val DefaultMetadataLogConfig = new MetadataLogConfig(
1108+
100 * 1024,
1109+
100 * 1024,
1110+
10 * 1000,
1111+
100 * 1024,
1112+
60 * 1000,
1113+
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
1114+
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
1115+
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
1116+
1
10801117
)
10811118

10821119
def buildMetadataLogAndDir(

core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala

+12-12
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import java.util.Optional
2525
import java.util.Properties
2626
import java.util.stream.IntStream
2727
import kafka.log.LogTestUtils
28-
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
28+
import kafka.raft.KafkaMetadataLog
2929
import kafka.server.KafkaRaftServer
3030
import kafka.tools.DumpLogSegments.{OffsetsMessageParser, ShareGroupStateMessageParser, TimeIndexDumpErrors, TransactionLogMessageParser}
3131
import kafka.utils.TestUtils
@@ -43,7 +43,7 @@ import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnap
4343
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
4444
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
4545
import org.apache.kafka.metadata.MetadataRecordSerde
46-
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch, VoterSetTest}
46+
import org.apache.kafka.raft.{KafkaRaftClient, MetadataLogConfig, OffsetAndEpoch, VoterSetTest}
4747
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion}
4848
import org.apache.kafka.server.config.ServerLogConfigs
4949
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde
@@ -544,16 +544,16 @@ class DumpLogSegmentsTest {
544544
logDir,
545545
time,
546546
time.scheduler,
547-
MetadataLogConfig(
548-
logSegmentBytes = 100 * 1024,
549-
logSegmentMinBytes = 100 * 1024,
550-
logSegmentMillis = 10 * 1000,
551-
retentionMaxBytes = 100 * 1024,
552-
retentionMillis = 60 * 1000,
553-
maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
554-
maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
555-
fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
556-
nodeId = 1
547+
new MetadataLogConfig(
548+
100 * 1024,
549+
100 * 1024,
550+
10 * 1000,
551+
100 * 1024,
552+
60 * 1000,
553+
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
554+
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
555+
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
556+
1
557557
)
558558
)
559559

0 commit comments

Comments
 (0)