Skip to content

Commit 321a380

Browse files
authored
KAFKA-14523: Decouple RemoteLogManager and Partition (#19391)
Remove the last dependency in the core module. Reviewers: Luke Chen <[email protected]>, PoAn Yang <[email protected]>
1 parent d183cf9 commit 321a380

File tree

8 files changed

+79
-36
lines changed

8 files changed

+79
-36
lines changed

Diff for: core/src/main/java/kafka/log/remote/RemoteLogManager.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package kafka.log.remote;
1818

19-
import kafka.cluster.Partition;
20-
2119
import org.apache.kafka.common.Endpoint;
2220
import org.apache.kafka.common.KafkaException;
2321
import org.apache.kafka.common.TopicIdPartition;
@@ -47,6 +45,7 @@
4745
import org.apache.kafka.server.common.OffsetAndEpoch;
4846
import org.apache.kafka.server.common.StopPartition;
4947
import org.apache.kafka.server.config.ServerConfigs;
48+
import org.apache.kafka.server.log.remote.TopicPartitionLog;
5049
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
5150
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
5251
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;
@@ -426,9 +425,9 @@ public RemoteStorageManager storageManager() {
426425
return remoteStorageManager;
427426
}
428427

429-
private Stream<Partition> filterPartitions(Set<Partition> partitions) {
428+
private Stream<TopicPartitionLog> filterPartitions(Set<TopicPartitionLog> partitions) {
430429
// We are not specifically checking for internal topics etc here as `log.remoteLogEnabled()` already handles that.
431-
return partitions.stream().filter(partition -> partition.log().exists(UnifiedLog::remoteLogEnabled));
430+
return partitions.stream().filter(partition -> partition.unifiedLog().isPresent() && partition.unifiedLog().get().remoteLogEnabled());
432431
}
433432

434433
private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
@@ -448,8 +447,8 @@ private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
448447
* @param partitionsBecomeFollower partitions that have become followers on this broker.
449448
* @param topicIds topic name to topic id mappings.
450449
*/
451-
public void onLeadershipChange(Set<Partition> partitionsBecomeLeader,
452-
Set<Partition> partitionsBecomeFollower,
450+
public void onLeadershipChange(Set<TopicPartitionLog> partitionsBecomeLeader,
451+
Set<TopicPartitionLog> partitionsBecomeFollower,
453452
Map<String, Uuid> topicIds) {
454453
LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
455454

@@ -458,12 +457,12 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader,
458457
}
459458

460459
Map<TopicIdPartition, Boolean> leaderPartitions = filterPartitions(partitionsBecomeLeader)
461-
.collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
462-
p -> p.log().exists(log -> log.config().remoteLogCopyDisable())));
460+
.collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topicPartition().topic()), p.topicPartition()),
461+
p -> p.unifiedLog().isPresent() ? p.unifiedLog().get().config().remoteLogCopyDisable() : false));
463462

464463
Map<TopicIdPartition, Boolean> followerPartitions = filterPartitions(partitionsBecomeFollower)
465-
.collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
466-
p -> p.log().exists(log -> log.config().remoteLogCopyDisable())));
464+
.collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topicPartition().topic()), p.topicPartition()),
465+
p -> p.unifiedLog().isPresent() ? p.unifiedLog().get().config().remoteLogCopyDisable() : false));
467466

468467
if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) {
469468
LOGGER.debug("Effective topic partitions after filtering compact and internal topics, leaders: {} and followers: {}",
@@ -483,8 +482,8 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader,
483482
}
484483
}
485484

486-
public void stopLeaderCopyRLMTasks(Set<Partition> partitions) {
487-
for (Partition partition : partitions) {
485+
public void stopLeaderCopyRLMTasks(Set<TopicPartitionLog> partitions) {
486+
for (TopicPartitionLog partition : partitions) {
488487
TopicPartition tp = partition.topicPartition();
489488
if (topicIdByPartitionMap.containsKey(tp)) {
490489
TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp);

Diff for: core/src/main/scala/kafka/cluster/Partition.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED
4040
import org.apache.kafka.common.utils.Time
4141
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache}
4242
import org.apache.kafka.server.common.RequestLocal
43+
import org.apache.kafka.server.log.remote.TopicPartitionLog
4344
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
4445
import org.apache.kafka.server.metrics.KafkaMetricsGroup
4546
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}
@@ -314,7 +315,7 @@ class Partition(val topicPartition: TopicPartition,
314315
logManager: LogManager,
315316
alterIsrManager: AlterPartitionManager,
316317
@volatile private var _topicId: Option[Uuid] = None // TODO: merge topicPartition and _topicId into TopicIdPartition once TopicId persist in most of the code by KAFKA-16212
317-
) extends Logging {
318+
) extends Logging with TopicPartitionLog {
318319

319320
import Partition.metricsGroup
320321
def topic: String = topicPartition.topic
@@ -368,6 +369,8 @@ class Partition(val topicPartition: TopicPartition,
368369
metricsGroup.newGauge("ReplicasCount", () => if (isLeader) assignmentState.replicationFactor else 0, tags)
369370
metricsGroup.newGauge("LastStableOffsetLag", () => log.map(_.lastStableOffsetLag).getOrElse(0), tags)
370371

372+
def unifiedLog(): Optional[UnifiedLog] = log.toJava
373+
371374
def hasLateTransaction(currentTimeMs: Long): Boolean = leaderLogIfLocal.exists(_.hasLateTransaction(currentTimeMs))
372375

373376
def isUnderReplicated: Boolean = isLeader && (assignmentState.replicationFactor - partitionState.isr.size) > 0

Diff for: core/src/main/scala/kafka/server/ConfigHandler.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.kafka.common.metrics.Quota._
2525
import org.apache.kafka.coordinator.group.GroupCoordinator
2626
import org.apache.kafka.server.ClientMetricsManager
2727
import org.apache.kafka.server.common.StopPartition
28+
import org.apache.kafka.server.log.remote.TopicPartitionLog
2829
import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason, ThrottledReplicaListValidator, UnifiedLog}
2930

3031
import scala.jdk.CollectionConverters._
@@ -53,7 +54,7 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
5354
val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
5455
val wasCopyDisabled = logs.exists(_.config.remoteLogCopyDisable())
5556

56-
logManager.updateTopicConfig(topic, topicConfig, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
57+
logManager.updateTopicConfig(topic, topicConfig, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled,
5758
wasRemoteLogEnabled)
5859
maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, wasCopyDisabled)
5960
}
@@ -74,13 +75,13 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
7475
if (isRemoteLogEnabled && (!wasRemoteLogEnabled || (wasCopyDisabled && !isCopyDisabled))) {
7576
val topicIds = Collections.singletonMap(topic, replicaManager.metadataCache.getTopicId(topic))
7677
replicaManager.remoteLogManager.foreach(rlm =>
77-
rlm.onLeadershipChange(leaderPartitions.toSet.asJava, followerPartitions.toSet.asJava, topicIds))
78+
rlm.onLeadershipChange((leaderPartitions.toSet: Set[TopicPartitionLog]).asJava, (followerPartitions.toSet: Set[TopicPartitionLog]).asJava, topicIds))
7879
}
7980

8081
// When copy disabled, we should stop leaderCopyRLMTask, but keep expirationTask
8182
if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) {
8283
replicaManager.remoteLogManager.foreach(rlm => {
83-
rlm.stopLeaderCopyRLMTasks(leaderPartitions.toSet.asJava)
84+
rlm.stopLeaderCopyRLMTasks((leaderPartitions.toSet: Set[TopicPartitionLog] ).asJava)
8485
})
8586
}
8687

Diff for: core/src/main/scala/kafka/server/ReplicaManager.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
5252
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
5353
import org.apache.kafka.metadata.MetadataCache
5454
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
55+
import org.apache.kafka.server.log.remote.TopicPartitionLog
5556
import org.apache.kafka.server.metrics.KafkaMetricsGroup
5657
import org.apache.kafka.server.network.BrokerEndPoint
5758
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey}
@@ -2168,7 +2169,7 @@ class ReplicaManager(val config: KafkaConfig,
21682169
replicaFetcherManager.shutdownIdleFetcherThreads()
21692170
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
21702171

2171-
remoteLogManager.foreach(rlm => rlm.onLeadershipChange(partitionsBecomeLeader.asJava, partitionsBecomeFollower.asJava, topicIds))
2172+
remoteLogManager.foreach(rlm => rlm.onLeadershipChange((partitionsBecomeLeader.toSet: Set[TopicPartitionLog]).asJava, (partitionsBecomeFollower.toSet: Set[TopicPartitionLog]).asJava, topicIds))
21722173

21732174
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
21742175

@@ -2811,7 +2812,7 @@ class ReplicaManager(val config: KafkaConfig,
28112812
replicaFetcherManager.shutdownIdleFetcherThreads()
28122813
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
28132814

2814-
remoteLogManager.foreach(rlm => rlm.onLeadershipChange(leaderChangedPartitions.asJava, followerChangedPartitions.asJava, localChanges.topicIds()))
2815+
remoteLogManager.foreach(rlm => rlm.onLeadershipChange((leaderChangedPartitions.toSet: Set[TopicPartitionLog]).asJava, (followerChangedPartitions.toSet: Set[TopicPartitionLog]).asJava, localChanges.topicIds()))
28152816
}
28162817

28172818
if (metadataVersion.isDirectoryAssignmentSupported) {

Diff for: core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java

+11-14
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package kafka.log.remote;
1818

19-
import kafka.cluster.Partition;
2019
import kafka.server.KafkaConfig;
2120

2221
import org.apache.kafka.common.Endpoint;
@@ -40,6 +39,7 @@
4039
import org.apache.kafka.server.common.OffsetAndEpoch;
4140
import org.apache.kafka.server.common.StopPartition;
4241
import org.apache.kafka.server.config.ServerConfigs;
42+
import org.apache.kafka.server.log.remote.TopicPartitionLog;
4343
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
4444
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;
4545
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
@@ -131,8 +131,6 @@
131131
import java.util.function.Supplier;
132132
import java.util.stream.Collectors;
133133

134-
import scala.Option;
135-
136134
import static kafka.log.remote.RemoteLogManager.isRemoteSegmentWithinLeaderEpochs;
137135
import static org.apache.kafka.common.record.TimestampType.CREATE_TIME;
138136
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
@@ -857,8 +855,8 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc
857855
return null;
858856
}).when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
859857

860-
Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
861-
Partition mockFollowerPartition = mockPartition(followerTopicIdPartition);
858+
TopicPartitionLog mockLeaderPartition = mockPartition(leaderTopicIdPartition);
859+
TopicPartitionLog mockFollowerPartition = mockPartition(followerTopicIdPartition);
862860
List<RemoteLogSegmentMetadata> list = listRemoteLogSegmentMetadata(leaderTopicIdPartition, segmentCount, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
863861
// return the metadataList 3 times, then return empty list to simulate all segments are deleted
864862
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)).thenReturn(list.iterator()).thenReturn(Collections.emptyIterator());
@@ -973,7 +971,7 @@ void testRemoteLogTaskUpdateRemoteLogSegmentMetadataAfterLogDirChanged() throws
973971
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
974972
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
975973

976-
Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
974+
TopicPartitionLog mockLeaderPartition = mockPartition(leaderTopicIdPartition);
977975
List<RemoteLogSegmentMetadata> metadataList = listRemoteLogSegmentMetadata(leaderTopicIdPartition, segmentCount, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
978976
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)).thenReturn(metadataList.iterator());
979977
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)).thenReturn(metadataList.iterator()).thenReturn(metadataList.iterator());
@@ -1117,7 +1115,7 @@ void testRemoteLogManagerRemoteMetrics() throws Exception {
11171115
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
11181116
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
11191117

1120-
Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
1118+
TopicPartitionLog mockLeaderPartition = mockPartition(leaderTopicIdPartition);
11211119

11221120
// This method is called by both Copy and Expiration task. On the first call, both tasks should see 175 bytes as
11231121
// the local log segments size
@@ -1385,8 +1383,8 @@ private void verifyNotInCache(TopicIdPartition... topicIdPartitions) {
13851383
@Test
13861384
void testTopicIdCacheUpdates() throws RemoteStorageException {
13871385
remoteLogManager.startup();
1388-
Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
1389-
Partition mockFollowerPartition = mockPartition(followerTopicIdPartition);
1386+
TopicPartitionLog mockLeaderPartition = mockPartition(leaderTopicIdPartition);
1387+
TopicPartitionLog mockFollowerPartition = mockPartition(followerTopicIdPartition);
13901388

13911389
when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(TopicIdPartition.class), anyInt(), anyLong()))
13921390
.thenReturn(Optional.empty());
@@ -1721,7 +1719,7 @@ void testFetchOffsetByTimestampWithTieredStorageDoesNotFetchIndexWhenExistsLocal
17211719
// 9999 -> refers to read from local, 999 -> refers to read from remote
17221720
FileRecords.TimestampAndOffset expectedLocalResult = new FileRecords.TimestampAndOffset(timestamp + 9999, 9999, Optional.of(Integer.MAX_VALUE));
17231721
FileRecords.TimestampAndOffset expectedRemoteResult = new FileRecords.TimestampAndOffset(timestamp + 999, 999, Optional.of(Integer.MAX_VALUE));
1724-
Partition mockFollowerPartition = mockPartition(tpId);
1722+
TopicPartitionLog mockFollowerPartition = mockPartition(tpId);
17251723

17261724
LogSegment logSegmentBaseOffset50 = mockLogSegment(50L, timestamp, null);
17271725
LogSegment logSegmentBaseOffset100 = mockLogSegment(100L, timestamp + 1, expectedLocalResult);
@@ -3760,14 +3758,13 @@ private void appendRecordsToFile(File file, int nRecords, int nRecordsPerBatch)
37603758
}
37613759
}
37623760

3763-
private Partition mockPartition(TopicIdPartition topicIdPartition) {
3761+
private TopicPartitionLog mockPartition(TopicIdPartition topicIdPartition) {
37643762
TopicPartition tp = topicIdPartition.topicPartition();
3765-
Partition partition = mock(Partition.class);
3763+
TopicPartitionLog partition = mock(TopicPartitionLog.class);
37663764
UnifiedLog log = mock(UnifiedLog.class);
37673765
when(partition.topicPartition()).thenReturn(tp);
3768-
when(partition.topic()).thenReturn(tp.topic());
37693766
when(log.remoteLogEnabled()).thenReturn(true);
3770-
when(partition.log()).thenReturn(Option.apply(log));
3767+
when(partition.unifiedLog()).thenReturn(Optional.of(log));
37713768
when(log.config()).thenReturn(new LogConfig(new Properties()));
37723769
return partition;
37733770
}

Diff for: core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
3535
import org.apache.kafka.coordinator.group.GroupConfig
3636
import org.apache.kafka.metadata.MetadataCache
3737
import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
38+
import org.apache.kafka.server.log.remote.TopicPartitionLog
3839
import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog}
3940
import org.apache.kafka.test.TestUtils.assertFutureThrows
4041
import org.junit.jupiter.api.Assertions._
@@ -574,8 +575,8 @@ class DynamicConfigChangeUnitTest {
574575
when(replicaManager.onlinePartition(tp1)).thenReturn(Some(partition1))
575576
when(log1.config).thenReturn(new LogConfig(Collections.emptyMap()))
576577

577-
val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]])
578-
val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]])
578+
val leaderPartitionsArg: ArgumentCaptor[util.Set[TopicPartitionLog]] = ArgumentCaptor.forClass(classOf[util.Set[TopicPartitionLog]])
579+
val followerPartitionsArg: ArgumentCaptor[util.Set[TopicPartitionLog]] = ArgumentCaptor.forClass(classOf[util.Set[TopicPartitionLog]])
579580
doNothing().when(rlm).onLeadershipChange(leaderPartitionsArg.capture(), followerPartitionsArg.capture(), any())
580581

581582
val isRemoteLogEnabledBeforeUpdate = false

Diff for: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
6060
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
6161
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
6262
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
63+
import org.apache.kafka.server.log.remote.TopicPartitionLog
6364
import org.apache.kafka.server.log.remote.storage._
6465
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
6566
import org.apache.kafka.server.network.BrokerEndPoint
@@ -4535,8 +4536,8 @@ class ReplicaManagerTest {
45354536
}
45364537

45374538
private def verifyRLMOnLeadershipChange(leaderPartitions: util.Set[Partition], followerPartitions: util.Set[Partition]): Unit = {
4538-
val leaderCapture: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]])
4539-
val followerCapture: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]])
4539+
val leaderCapture: ArgumentCaptor[util.Set[TopicPartitionLog]] = ArgumentCaptor.forClass(classOf[util.Set[TopicPartitionLog]])
4540+
val followerCapture: ArgumentCaptor[util.Set[TopicPartitionLog]] = ArgumentCaptor.forClass(classOf[util.Set[TopicPartitionLog]])
45404541
val topicIdsCapture: ArgumentCaptor[util.Map[String, Uuid]] = ArgumentCaptor.forClass(classOf[util.Map[String, Uuid]])
45414542
verify(mockRemoteLogManager).onLeadershipChange(leaderCapture.capture(), followerCapture.capture(), topicIdsCapture.capture())
45424543

0 commit comments

Comments
 (0)