Skip to content

Commit 08a93fe

Browse files
authored
KAFKA-14523: Move DelayedRemoteListOffsets to the storage module (#19285)
Decouple RemoteLogManager and ReplicaManager. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent d6ff7b8 commit 08a93fe

File tree

13 files changed

+492
-455
lines changed

13 files changed

+492
-455
lines changed

checkstyle/import-control-storage.xml

+4
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@
4949

5050

5151
<subpackage name="server">
52+
<allow pkg="com.yammer.metrics.core" />
5253
<allow pkg="org.apache.kafka.common" />
54+
<allow pkg="org.apache.kafka.server.metrics" />
55+
<allow pkg="org.apache.kafka.server.util.timer" />
56+
<allow pkg="org.apache.kafka.storage.internals.log" />
5357

5458
<subpackage name="log">
5559
<allow pkg="com.fasterxml.jackson" />

core/src/main/java/kafka/log/remote/RemoteLogManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package kafka.log.remote;
1818

1919
import kafka.cluster.Partition;
20-
import kafka.server.DelayedRemoteListOffsets;
2120

2221
import org.apache.kafka.common.Endpoint;
2322
import org.apache.kafka.common.KafkaException;
@@ -66,6 +65,7 @@
6665
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
6766
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
6867
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
68+
import org.apache.kafka.server.purgatory.DelayedRemoteListOffsets;
6969
import org.apache.kafka.server.purgatory.TopicPartitionOperationKey;
7070
import org.apache.kafka.server.quota.QuotaType;
7171
import org.apache.kafka.server.storage.log.FetchIsolation;

core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala

-168
This file was deleted.

core/src/main/scala/kafka/server/KafkaApis.scala

+7-5
Original file line numberDiff line numberDiff line change
@@ -769,18 +769,20 @@ class KafkaApis(val requestChannel: RequestChannel,
769769
.setName(topic.name)
770770
.setPartitions(topic.partitions.asScala.map(partition =>
771771
buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, partition)).asJava)
772-
)
772+
).asJava
773773

774-
def sendResponseCallback(response: Seq[ListOffsetsTopicResponse]): Unit = {
775-
val mergedResponses = response ++ unauthorizedResponseStatus
774+
def sendResponseCallback(response: util.Collection[ListOffsetsTopicResponse]): Void = {
775+
val mergedResponses = new util.ArrayList(response)
776+
mergedResponses.addAll(unauthorizedResponseStatus)
776777
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
777778
new ListOffsetsResponse(new ListOffsetsResponseData()
778779
.setThrottleTimeMs(requestThrottleMs)
779-
.setTopics(mergedResponses.asJava)))
780+
.setTopics(mergedResponses)))
781+
null
780782
}
781783

782784
if (authorizedRequestInfo.isEmpty) {
783-
sendResponseCallback(Seq.empty)
785+
sendResponseCallback(util.List.of)
784786
} else {
785787
replicaManager.fetchOffset(authorizedRequestInfo, offsetRequest.duplicatePartitions().asScala,
786788
offsetRequest.isolationLevel(), offsetRequest.replicaId(), clientId, correlationId, version,

core/src/main/scala/kafka/server/ReplicaManager.scala

+9-8
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ import org.apache.kafka.metadata.MetadataCache
5353
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
5454
import org.apache.kafka.server.metrics.KafkaMetricsGroup
5555
import org.apache.kafka.server.network.BrokerEndPoint
56-
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DeleteRecordsPartitionStatus, TopicPartitionOperationKey}
56+
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey}
5757
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey}
5858
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
5959
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
60-
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, ListOffsetsPartitionStatus, common}
60+
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
6161
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
62-
import org.apache.kafka.storage.internals.log._
62+
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
6363
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
6464

6565
import java.io.File
@@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicBoolean
7070
import java.util.concurrent.locks.Lock
7171
import java.util.concurrent.{CompletableFuture, Future, RejectedExecutionException, TimeUnit}
7272
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
73+
import java.util.function.Consumer
7374
import scala.collection.{Map, Seq, Set, immutable, mutable}
7475
import scala.jdk.CollectionConverters._
7576
import scala.jdk.OptionConverters.{RichOption, RichOptional}
@@ -841,7 +842,7 @@ class ReplicaManager(val config: KafkaConfig,
841842
)
842843

843844
val retryTimeoutMs = Math.min(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(), config.requestTimeoutMs)
844-
val addPartitionsRetryBackoffMs = config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs
845+
val addPartitionsRetryBackoffMs = config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs()
845846
val startVerificationTimeMs = time.milliseconds
846847
def maybeRetryOnConcurrentTransactions(results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = {
847848
if (time.milliseconds() - startVerificationTimeMs >= retryTimeoutMs) {
@@ -1470,7 +1471,7 @@ class ReplicaManager(val config: KafkaConfig,
14701471
correlationId: Int,
14711472
version: Short,
14721473
buildErrorResponse: (Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse,
1473-
responseCallback: List[ListOffsetsTopicResponse] => Unit,
1474+
responseCallback: Consumer[util.Collection[ListOffsetsTopicResponse]],
14741475
timeoutMs: Int = 0): Unit = {
14751476
val statusByPartition = mutable.Map[TopicPartition, ListOffsetsPartitionStatus]()
14761477
topics.foreach { topic =>
@@ -1569,7 +1570,7 @@ class ReplicaManager(val config: KafkaConfig,
15691570
if (delayedRemoteListOffsetsRequired(statusByPartition)) {
15701571
val delayMs: Long = if (timeoutMs > 0) timeoutMs else config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs()
15711572
// create delayed remote list offsets operation
1572-
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version, statusByPartition, this, responseCallback)
1573+
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version, statusByPartition.asJava, tp => getPartitionOrException(tp), responseCallback)
15731574
// create a list of (topic, partition) pairs to use as keys for this delayed remote list offsets operation
15741575
val listOffsetsRequestKeys = statusByPartition.keys.map(new TopicPartitionOperationKey(_)).toList
15751576
// try to complete the request immediately, otherwise put it into the purgatory
@@ -1580,7 +1581,7 @@ class ReplicaManager(val config: KafkaConfig,
15801581
case (topic, status) =>
15811582
new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => Some(s.responseOpt.get())).toList.asJava)
15821583
}.toList
1583-
responseCallback(responseTopics)
1584+
responseCallback.accept(responseTopics.asJava)
15841585
}
15851586
}
15861587

@@ -1899,7 +1900,7 @@ class ReplicaManager(val config: KafkaConfig,
18991900
createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset,
19001901
new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage"))
19011902
} else {
1902-
val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs()
1903+
val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs
19031904
val fetchDataInfo = if (throttleTimeMs > 0) {
19041905
// Record the throttle time for the remote log fetches
19051906
remoteLogManager.get.fetchThrottleTimeSensor().record(throttleTimeMs, time.milliseconds())

0 commit comments

Comments
 (0)