Skip to content

Commit 8827ce4

Browse files
KAFKA-19113: Migrate DelegationTokenManager to server module (#19424)
1. Migrate DelegationTokenManager to server module. 2. Rewrite DelegationTokenManager in Java. 3. Move DelegationTokenManagerConfigs out of KafkaConfig. Reviewers: Mickael Maison <[email protected]>

1 parent dced8bf commit 8827ce4

File tree

13 files changed

+266
-182
lines changed

13 files changed

+266
-182
lines changed

Diff for: checkstyle/import-control-server.xml

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
<allow pkg="org.apache.kafka.raft" />
8383

8484
<subpackage name="server">
85+
<allow pkg="javax.crypto" />
8586
<allow pkg="org.apache.kafka.server" />
8687
<allow pkg="org.apache.kafka.image" />
8788
<allow pkg="org.apache.kafka.storage.internals.log" />

Diff for: core/src/main/java/kafka/server/builders/KafkaApisBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import kafka.network.RequestChannel;
2222
import kafka.server.ApiVersionManager;
2323
import kafka.server.AutoTopicCreationManager;
24-
import kafka.server.DelegationTokenManager;
2524
import kafka.server.FetchManager;
2625
import kafka.server.ForwardingManager;
2726
import kafka.server.KafkaApis;
@@ -38,6 +37,7 @@
3837
import org.apache.kafka.metadata.ConfigRepository;
3938
import org.apache.kafka.metadata.MetadataCache;
4039
import org.apache.kafka.server.ClientMetricsManager;
40+
import org.apache.kafka.server.DelegationTokenManager;
4141
import org.apache.kafka.server.authorizer.Authorizer;
4242
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
4343

Diff for: core/src/main/scala/kafka/server/BrokerServer.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@ import org.apache.kafka.metadata.publisher.AclPublisher
4646
import org.apache.kafka.security.CredentialProvider
4747
import org.apache.kafka.server.authorizer.Authorizer
4848
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
49-
import org.apache.kafka.server.config.ConfigType
49+
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
5050
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
5151
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
5252
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
5353
import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpStatePersister, Persister, PersisterStateManager}
5454
import org.apache.kafka.server.share.session.ShareSessionCache
5555
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
5656
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
57-
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DelayedActionQueue, ProcessRole}
57+
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DelayedActionQueue, DelegationTokenManager, ProcessRole}
5858
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
5959
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
6060

@@ -355,7 +355,7 @@ class BrokerServer(
355355
)
356356

357357
/* start token manager */
358-
tokenManager = new DelegationTokenManager(config, tokenCache, time)
358+
tokenManager = new DelegationTokenManager(new DelegationTokenManagerConfigs(config), tokenCache)
359359

360360
/* initializing the groupConfigManager */
361361
groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig))

Diff for: core/src/main/scala/kafka/server/ControllerApis.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
5656
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
5757
import org.apache.kafka.common.security.auth.KafkaPrincipal
5858
import org.apache.kafka.common.security.auth.SecurityProtocol
59-
import org.apache.kafka.server.ProcessRole
59+
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
6060
import org.apache.kafka.server.authorizer.Authorizer
6161
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
6262

@@ -970,7 +970,7 @@ class ControllerApis(
970970
new RenewDelegationTokenResponseData()
971971
.setThrottleTimeMs(requestThrottleMs)
972972
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
973-
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
973+
.setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
974974
CompletableFuture.completedFuture[Unit](())
975975
} else {
976976
val context = new ControllerRequestContext(
@@ -994,7 +994,7 @@ class ControllerApis(
994994
new ExpireDelegationTokenResponseData()
995995
.setThrottleTimeMs(requestThrottleMs)
996996
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
997-
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
997+
.setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
998998
CompletableFuture.completedFuture[Unit](())
999999
} else {
10001000
val context = new ControllerRequestContext(

Diff for: core/src/main/scala/kafka/server/ControllerServer.scala

+9-8
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
4040
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
4141
import org.apache.kafka.raft.QuorumConfig
4242
import org.apache.kafka.security.CredentialProvider
43-
import org.apache.kafka.server.ProcessRole
43+
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
4444
import org.apache.kafka.server.authorizer.Authorizer
4545
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
4646
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager}
47-
import org.apache.kafka.server.config.ConfigType
47+
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
4848
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
4949
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
5050
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@@ -206,9 +206,10 @@ class ControllerServer(
206206
QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled),
207207
controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava)
208208

209+
val delegationTokenManagerConfigs = new DelegationTokenManagerConfigs(config)
209210
val delegationTokenKeyString = {
210-
if (config.tokenAuthEnabled) {
211-
config.delegationTokenSecretKey.value
211+
if (delegationTokenManagerConfigs.tokenAuthEnabled) {
212+
delegationTokenManagerConfigs.delegationTokenSecretKey.value
212213
} else {
213214
null
214215
}
@@ -247,9 +248,9 @@ class ControllerServer(
247248
setNonFatalFaultHandler(sharedServer.nonFatalQuorumControllerFaultHandler).
248249
setDelegationTokenCache(tokenCache).
249250
setDelegationTokenSecretKey(delegationTokenKeyString).
250-
setDelegationTokenMaxLifeMs(config.delegationTokenMaxLifeMs).
251-
setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
252-
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
251+
setDelegationTokenMaxLifeMs(delegationTokenManagerConfigs.delegationTokenMaxLifeMs).
252+
setDelegationTokenExpiryTimeMs(delegationTokenManagerConfigs.delegationTokenExpiryTimeMs).
253+
setDelegationTokenExpiryCheckIntervalMs(delegationTokenManagerConfigs.delegationTokenExpiryCheckIntervalMs).
253254
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
254255
setInterBrokerListenerName(config.interBrokerListenerName.value()).
255256
setControllerPerformanceSamplePeriodMs(config.controllerPerformanceSamplePeriodMs).
@@ -361,7 +362,7 @@ class ControllerServer(
361362
config,
362363
sharedServer.metadataPublishingFaultHandler,
363364
"controller",
364-
new DelegationTokenManager(config, tokenCache, time)
365+
new DelegationTokenManager(delegationTokenManagerConfigs, tokenCache)
365366
))
366367

367368
// Set up the metrics publisher.

Diff for: core/src/main/scala/kafka/server/DelegationTokenManager.scala

-143
This file was deleted.

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

+8-7
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,10 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
6060
import org.apache.kafka.coordinator.group.{Group, GroupConfig, GroupConfigManager, GroupCoordinator}
6161
import org.apache.kafka.coordinator.share.ShareCoordinator
6262
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
63-
import org.apache.kafka.server.{ClientMetricsManager, ProcessRole}
63+
import org.apache.kafka.server.{ClientMetricsManager, DelegationTokenManager, ProcessRole}
6464
import org.apache.kafka.server.authorizer._
6565
import org.apache.kafka.server.common.{GroupVersion, RequestLocal, TransactionVersion}
66+
import org.apache.kafka.server.config.DelegationTokenManagerConfigs
6667
import org.apache.kafka.server.share.context.ShareFetchContext
6768
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, SharePartitionKey}
6869
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
@@ -2201,7 +2202,7 @@ class KafkaApis(val requestChannel: RequestChannel,
22012202
new ExpireDelegationTokenResponseData()
22022203
.setThrottleTimeMs(requestThrottleMs)
22032204
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
2204-
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
2205+
.setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
22052206
} else {
22062207
forwardToController(request)
22072208
}
@@ -2214,7 +2215,7 @@ class KafkaApis(val requestChannel: RequestChannel,
22142215
new RenewDelegationTokenResponseData()
22152216
.setThrottleTimeMs(requestThrottleMs)
22162217
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
2217-
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
2218+
.setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
22182219
} else {
22192220
forwardToController(request)
22202221
}
@@ -2233,7 +2234,7 @@ class KafkaApis(val requestChannel: RequestChannel,
22332234

22342235
if (!allowTokenRequests(request))
22352236
sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, Collections.emptyList)
2236-
else if (!config.tokenAuthEnabled)
2237+
else if (!new DelegationTokenManagerConfigs(config).tokenAuthEnabled)
22372238
sendResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, Collections.emptyList)
22382239
else {
22392240
val requestPrincipal = request.context.principal
@@ -2242,10 +2243,10 @@ class KafkaApis(val requestChannel: RequestChannel,
22422243
sendResponseCallback(Errors.NONE, Collections.emptyList)
22432244
}
22442245
else {
2245-
val owners = if (describeTokenRequest.data.owners == null)
2246-
None
2246+
val owners: Optional[util.List[KafkaPrincipal]] = if (describeTokenRequest.data.owners == null)
2247+
Optional.empty()
22472248
else
2248-
Some(describeTokenRequest.data.owners.asScala.map(p => new KafkaPrincipal(p.principalType(), p.principalName)).toList)
2249+
Optional.of(describeTokenRequest.data.owners.stream().map(p => new KafkaPrincipal(p.principalType(), p.principalName)).toList)
22492250
def authorizeToken(tokenId: String) = authHelper.authorize(request.context, DESCRIBE, DELEGATION_TOKEN, tokenId)
22502251
def authorizeRequester(owner: KafkaPrincipal) = authHelper.authorize(request.context, DESCRIBE_TOKENS, USER, owner.toString)
22512252
def eligible(token: TokenInformation) = DelegationTokenManager

Diff for: core/src/main/scala/kafka/server/KafkaConfig.scala

+1-8
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
4444
import org.apache.kafka.server.ProcessRole
4545
import org.apache.kafka.server.authorizer.Authorizer
4646
import org.apache.kafka.server.common.MetadataVersion
47-
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
47+
import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
4848
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
4949
import org.apache.kafka.server.metrics.MetricConfigs
5050
import org.apache.kafka.server.util.Csv
@@ -429,13 +429,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
429429
def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
430430
def saslMechanismInterBrokerProtocol = getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG)
431431

432-
/** ********* DelegationToken Configuration **************/
433-
val delegationTokenSecretKey = getPassword(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG)
434-
val tokenAuthEnabled = delegationTokenSecretKey != null && delegationTokenSecretKey.value.nonEmpty
435-
val delegationTokenMaxLifeMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG)
436-
val delegationTokenExpiryTimeMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG)
437-
val delegationTokenExpiryCheckIntervalMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG)
438-
439432
/** ********* Fetch Configuration **************/
440433
val maxIncrementalFetchSessionCacheSlots = getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG)
441434
val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG)

Diff for: core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package kafka.server.metadata
1919

20-
import kafka.server.DelegationTokenManager
2120
import kafka.server.KafkaConfig
2221
import kafka.utils.Logging
2322
import org.apache.kafka.image.loader.LoaderManifest
2423
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
24+
import org.apache.kafka.server.DelegationTokenManager
2525
import org.apache.kafka.server.fault.FaultHandler
2626

2727

Diff for: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

-8
Original file line numberDiff line numberDiff line change
@@ -1173,14 +1173,6 @@ class KafkaConfigTest {
11731173
assertEquals(123L, config.logFlushIntervalMs)
11741174
assertEquals(CompressionType.SNAPPY, config.groupCoordinatorConfig.offsetTopicCompressionType)
11751175
assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)
1176-
assertEquals(false, config.tokenAuthEnabled)
1177-
assertEquals(7 * 24 * 60L * 60L * 1000L, config.delegationTokenMaxLifeMs)
1178-
assertEquals(24 * 60L * 60L * 1000L, config.delegationTokenExpiryTimeMs)
1179-
assertEquals(1 * 60L * 1000L * 60, config.delegationTokenExpiryCheckIntervalMs)
1180-
1181-
defaults.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, "1234567890")
1182-
val config1 = KafkaConfig.fromProps(defaults)
1183-
assertEquals(true, config1.tokenAuthEnabled)
11841176
}
11851177

11861178
@Test

0 commit comments

Comments
 (0)