Skip to content

Commit 5f2a68b

Browse files
authored
KAFKA-19119 Move ApiVersionManager/SimpleApiVersionManager to server (#19426)
Reviewers: Ken Huang <[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 80b209d commit 5f2a68b

20 files changed

+345
-249
lines changed

Diff for: checkstyle/import-control-server.xml

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
<allow pkg="javax.crypto" />
8686
<allow pkg="org.apache.kafka.server" />
8787
<allow pkg="org.apache.kafka.image" />
88+
<allow pkg="org.apache.kafka.network.metrics" />
8889
<allow pkg="org.apache.kafka.storage.internals.log" />
8990
<allow pkg="org.apache.kafka.storage.internals.checkpoint" />
9091
<subpackage name="metrics">

Diff for: core/src/main/java/kafka/server/builders/KafkaApisBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import kafka.coordinator.transaction.TransactionCoordinator;
2121
import kafka.network.RequestChannel;
22-
import kafka.server.ApiVersionManager;
2322
import kafka.server.AutoTopicCreationManager;
2423
import kafka.server.FetchManager;
2524
import kafka.server.ForwardingManager;
@@ -36,6 +35,7 @@
3635
import org.apache.kafka.coordinator.share.ShareCoordinator;
3736
import org.apache.kafka.metadata.ConfigRepository;
3837
import org.apache.kafka.metadata.MetadataCache;
38+
import org.apache.kafka.server.ApiVersionManager;
3939
import org.apache.kafka.server.ClientMetricsManager;
4040
import org.apache.kafka.server.DelegationTokenManager;
4141
import org.apache.kafka.server.authorizer.Authorizer;

Diff for: core/src/main/scala/kafka/network/SocketServer.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic._
2828
import kafka.network.Processor._
2929
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
3030
import kafka.network.SocketServer._
31-
import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
31+
import kafka.server.{BrokerReconfigurable, KafkaConfig}
3232
import org.apache.kafka.network.EndPoint
3333
import org.apache.kafka.common.message.ApiMessageType.ListenerType
3434
import kafka.utils._
@@ -46,7 +46,7 @@ import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
4646
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
4747
import org.apache.kafka.network.{ConnectionQuotaEntity, ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
4848
import org.apache.kafka.security.CredentialProvider
49-
import org.apache.kafka.server.ServerSocketFactory
49+
import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
5050
import org.apache.kafka.server.config.QuotaConfig
5151
import org.apache.kafka.server.metrics.KafkaMetricsGroup
5252
import org.apache.kafka.server.network.ConnectionDisconnectListener
@@ -872,7 +872,7 @@ private[kafka] class Processor(
872872
credentialProvider.tokenCache,
873873
time,
874874
logContext,
875-
version => apiVersionManager.apiVersionResponse(throttleTimeMs = 0, version < 4)
875+
version => apiVersionManager.apiVersionResponse(0, version < 4)
876876
)
877877
)
878878

Diff for: core/src/main/scala/kafka/server/ApiVersionManager.scala

-176
This file was deleted.

Diff for: core/src/main/scala/kafka/server/BrokerServer.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
5454
import org.apache.kafka.server.share.session.ShareSessionCache
5555
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
5656
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
57-
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DelayedActionQueue, DelegationTokenManager, ProcessRole}
57+
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, DelegationTokenManager, ProcessRole}
5858
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
5959
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
6060

@@ -252,13 +252,13 @@ class BrokerServer(
252252
forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager, metrics)
253253
clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, config.clientTelemetryMaxBytes, time, metrics)
254254

255-
val apiVersionManager = ApiVersionManager(
255+
val apiVersionManager = new DefaultApiVersionManager(
256256
ListenerType.BROKER,
257-
config,
258-
forwardingManager,
257+
() => forwardingManager.controllerApiVersions,
259258
brokerFeatures,
260259
metadataCache,
261-
Some(clientMetricsManager)
260+
config.unstableApiVersionsEnabled,
261+
Optional.of(clientMetricsManager)
262262
)
263263

264264
val connectionDisconnectListeners = Seq(clientMetricsManager.connectionDisconnectListener())

Diff for: core/src/main/scala/kafka/server/ControllerApis.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
5656
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
5757
import org.apache.kafka.common.security.auth.KafkaPrincipal
5858
import org.apache.kafka.common.security.auth.SecurityProtocol
59-
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
59+
import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager, ProcessRole}
6060
import org.apache.kafka.server.authorizer.Authorizer
6161
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
6262

Diff for: core/src/main/scala/kafka/server/ControllerServer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
4040
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
4141
import org.apache.kafka.raft.QuorumConfig
4242
import org.apache.kafka.security.CredentialProvider
43-
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
43+
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager}
4444
import org.apache.kafka.server.authorizer.Authorizer
4545
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
4646
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager}

Diff for: core/src/main/scala/kafka/server/ForwardingManager.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors
2727
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader}
2828
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
2929

30+
import java.util.Optional
3031
import java.util.concurrent.TimeUnit
3132
import scala.jdk.OptionConverters.RichOptional
3233

@@ -85,7 +86,7 @@ trait ForwardingManager {
8586
responseCallback: Option[AbstractResponse] => Unit
8687
): Unit
8788

88-
def controllerApiVersions: Option[NodeApiVersions]
89+
def controllerApiVersions: Optional[NodeApiVersions]
8990
}
9091

9192
object ForwardingManager {
@@ -187,8 +188,8 @@ class ForwardingManagerImpl(
187188
override def close(): Unit =
188189
forwardingManagerMetrics.close()
189190

190-
override def controllerApiVersions: Option[NodeApiVersions] =
191-
channelManager.controllerApiVersions.toScala
191+
override def controllerApiVersions: Optional[NodeApiVersions] =
192+
channelManager.controllerApiVersions
192193

193194
private def parseResponse(
194195
buffer: ByteBuffer,

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
6060
import org.apache.kafka.coordinator.group.{Group, GroupConfig, GroupConfigManager, GroupCoordinator}
6161
import org.apache.kafka.coordinator.share.ShareCoordinator
6262
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
63-
import org.apache.kafka.server.{ClientMetricsManager, DelegationTokenManager, ProcessRole}
63+
import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, DelegationTokenManager, ProcessRole}
6464
import org.apache.kafka.server.authorizer._
6565
import org.apache.kafka.server.common.{GroupVersion, RequestLocal, TransactionVersion}
6666
import org.apache.kafka.server.config.DelegationTokenManagerConfigs

Diff for: core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package kafka.tools
1919

2020
import kafka.network.RequestChannel
2121
import kafka.raft.RaftManager
22-
import kafka.server.{ApiRequestHandler, ApiVersionManager}
22+
import kafka.server.ApiRequestHandler
2323
import kafka.utils.Logging
2424
import org.apache.kafka.common.internals.FatalExitError
2525
import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumEpochResponseData, FetchResponseData, FetchSnapshotResponseData, VoteResponseData}
2626
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage}
2727
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BeginQuorumEpochResponse, EndQuorumEpochResponse, FetchResponse, FetchSnapshotResponse, VoteResponse}
2828
import org.apache.kafka.common.utils.Time
29+
import org.apache.kafka.server.ApiVersionManager
2930
import org.apache.kafka.server.common.RequestLocal
3031

3132
/**
@@ -65,7 +66,7 @@ class TestRaftRequestHandler(
6566
}
6667

6768
private def handleApiVersions(request: RequestChannel.Request): Unit = {
68-
requestChannel.sendResponse(request, apiVersionManager.apiVersionResponse(throttleTimeMs = 0, request.header.apiVersion() < 4), None)
69+
requestChannel.sendResponse(request, apiVersionManager.apiVersionResponse(0, request.header.apiVersion() < 4), None)
6970
}
7071

7172
private def handleVote(request: RequestChannel.Request): Unit = {

Diff for: core/src/main/scala/kafka/tools/TestRaftServer.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDe
2323
import joptsimple.{OptionException, OptionSpec}
2424
import kafka.network.SocketServer
2525
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager}
26-
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager}
26+
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool}
2727
import kafka.utils.{CoreUtils, Logging}
2828
import org.apache.kafka.common.errors.InvalidConfigurationException
2929
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -38,6 +38,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
3838
import org.apache.kafka.raft.errors.NotLeaderException
3939
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient}
4040
import org.apache.kafka.security.CredentialProvider
41+
import org.apache.kafka.server.SimpleApiVersionManager
4142
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
4243
import org.apache.kafka.server.common.serialization.RecordSerde
4344
import org.apache.kafka.server.config.KRaftConfigs

0 commit comments

Comments
 (0)