Skip to content

Commit fb2ce76

Browse files
authored
KAFKA-18888: Add KIP-877 support to Authorizer (#19050)
This also adds metrics to StandardAuthorizer Reviewers: Chia-Ping Tsai <[email protected]>, Ken Huang <[email protected]>, Jhen-Yung Hsu <[email protected]>, TaiJuWu <[email protected]>
1 parent e9ca0bb commit fb2ce76

File tree

43 files changed

+663
-248
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+663
-248
lines changed

checkstyle/import-control-metadata.xml

+3
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,9 @@
176176
<allow pkg="org.apache.kafka.controller" />
177177
<allow pkg="org.apache.kafka.metadata" />
178178
<allow pkg="org.apache.kafka.common.internals" />
179+
<allow pkg="org.apache.kafka.common.metrics" />
180+
<allow pkg="org.apache.kafka.common.metrics.internals" />
181+
<allow pkg="org.apache.kafka.common.metrics.stats" />
179182
</subpackage>
180183
<subpackage name="bootstrap">
181184
<allow pkg="org.apache.kafka.snapshot" />

checkstyle/import-control-server.xml

+1
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
<subpackage name="security">
106106
<allow pkg="org.apache.kafka.common.resource" />
107107
<allow pkg="org.apache.kafka.network" />
108+
<allow pkg="org.apache.kafka.server" />
108109
<allow pkg="org.apache.kafka.server.authorizer" />
109110
</subpackage>
110111

clients/src/main/java/org/apache/kafka/common/internals/Plugin.java

+53-13
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@
3030
import java.util.concurrent.atomic.AtomicReference;
3131
import java.util.function.Supplier;
3232

33+
/**
34+
* Plugins have the following tags:
35+
* <ul>
36+
* <li><code>config</code> set to the name of the configuration to specifying the plugin</li>
37+
* <li><code>class</code> set to the name of the instance class</li>
38+
* </ul>
39+
*/
3340
public class Plugin<T> implements Supplier<T>, AutoCloseable {
3441

3542
private final T instance;
@@ -40,14 +47,49 @@ private Plugin(T instance, PluginMetricsImpl pluginMetrics) {
4047
this.pluginMetrics = Optional.ofNullable(pluginMetrics);
4148
}
4249

50+
/**
51+
* Wrap an instance into a Plugin.
52+
* @param instance the instance to wrap
53+
* @param metrics the metrics
54+
* @param tagsSupplier supplier to retrieve the tags
55+
* @return the plugin
56+
*/
57+
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, Supplier<Map<String, String>> tagsSupplier) {
58+
PluginMetricsImpl pluginMetrics = null;
59+
if (instance instanceof Monitorable && metrics != null) {
60+
pluginMetrics = new PluginMetricsImpl(metrics, tagsSupplier.get());
61+
((Monitorable) instance).withPluginMetrics(pluginMetrics);
62+
}
63+
return new Plugin<>(instance, pluginMetrics);
64+
}
65+
66+
/**
67+
* Wrap an instance into a Plugin.
68+
* @param instance the instance to wrap
69+
* @param metrics the metrics
70+
* @param key the value for the <code>config</code> tag
71+
* @return the plugin
72+
*/
4373
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key) {
4474
return wrapInstance(instance, metrics, () -> tags(key, instance));
4575
}
4676

47-
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key, Map<String, String> extraTags) {
48-
Map<String, String> tags = tags(key, instance);
49-
tags.putAll(extraTags);
50-
return wrapInstance(instance, metrics, () -> tags);
77+
/**
78+
* Wrap an instance into a Plugin.
79+
* @param instance the instance to wrap
80+
* @param metrics the metrics
81+
* @param name extra tag name to add
82+
* @param value extra tag value to add
83+
* @param key the value for the <code>config</code> tag
84+
* @return the plugin
85+
*/
86+
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key, String name, String value) {
87+
Supplier<Map<String, String>> tagsSupplier = () -> {
88+
Map<String, String> tags = tags(key, instance);
89+
tags.put(name, value);
90+
return tags;
91+
};
92+
return wrapInstance(instance, metrics, tagsSupplier);
5193
}
5294

5395
private static <T> Map<String, String> tags(String key, T instance) {
@@ -57,6 +99,13 @@ private static <T> Map<String, String> tags(String key, T instance) {
5799
return tags;
58100
}
59101

102+
/**
103+
* Wrap a list of instances into Plugins.
104+
* @param instances the instances to wrap
105+
* @param metrics the metrics
106+
* @param key the value for the <code>config</code> tag
107+
* @return the list of plugins
108+
*/
60109
public static <T> List<Plugin<T>> wrapInstances(List<T> instances, Metrics metrics, String key) {
61110
List<Plugin<T>> plugins = new ArrayList<>();
62111
for (T instance : instances) {
@@ -65,15 +114,6 @@ public static <T> List<Plugin<T>> wrapInstances(List<T> instances, Metrics metri
65114
return plugins;
66115
}
67116

68-
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, Supplier<Map<String, String>> tagsSupplier) {
69-
PluginMetricsImpl pluginMetrics = null;
70-
if (instance instanceof Monitorable && metrics != null) {
71-
pluginMetrics = new PluginMetricsImpl(metrics, tagsSupplier.get());
72-
((Monitorable) instance).withPluginMetrics(pluginMetrics);
73-
}
74-
return new Plugin<>(instance, pluginMetrics);
75-
}
76-
77117
@Override
78118
public T get() {
79119
return instance;

clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java

+4
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@
5959
*
6060
* Authorizer implementation class may optionally implement @{@link org.apache.kafka.common.Reconfigurable}
6161
* to enable dynamic reconfiguration without restarting the broker.
62+
* <p>Authorizer implementation class may also optionally implement {@link org.apache.kafka.common.metrics.Monitorable}
63+
* to enable the authorizer to register metrics. The following tags are automatically added to all metrics registered:
64+
* <code>config</code> set to <code>authorizer.class.name</code>, <code>class</code> set to the Authorizer class name,
65+
* and <code>role</code> set to either <code>broker</code> or <code>controller</code>.
6266
* <p>
6367
* <b>Threading model:</b>
6468
* <ul>

core/src/main/java/kafka/server/QuotaFactory.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.kafka.server.quota.ClientQuotaCallback;
2828
import org.apache.kafka.server.quota.QuotaType;
2929

30-
import java.util.Map;
3130
import java.util.Optional;
3231

3332
import scala.Option;
@@ -150,7 +149,7 @@ private static Optional<Plugin<ClientQuotaCallback>> createClientQuotaCallback(
150149
clientQuotaCallback,
151150
metrics,
152151
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
153-
Map.of("role", role)
152+
"role", role
154153
));
155154
}
156155

core/src/main/java/kafka/server/builders/KafkaApisBuilder.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import kafka.server.ReplicaManager;
2929
import kafka.server.share.SharePartitionManager;
3030

31+
import org.apache.kafka.common.internals.Plugin;
3132
import org.apache.kafka.common.metrics.Metrics;
3233
import org.apache.kafka.common.utils.Time;
3334
import org.apache.kafka.coordinator.group.GroupConfigManager;
@@ -58,7 +59,7 @@ public class KafkaApisBuilder {
5859
private ConfigRepository configRepository = null;
5960
private MetadataCache metadataCache = null;
6061
private Metrics metrics = null;
61-
private Optional<Authorizer> authorizer = Optional.empty();
62+
private Optional<Plugin<Authorizer>> authorizerPlugin = Optional.empty();
6263
private QuotaManagers quotas = null;
6364
private FetchManager fetchManager = null;
6465
private SharePartitionManager sharePartitionManager = null;
@@ -131,8 +132,8 @@ public KafkaApisBuilder setMetrics(Metrics metrics) {
131132
return this;
132133
}
133134

134-
public KafkaApisBuilder setAuthorizer(Optional<Authorizer> authorizer) {
135-
this.authorizer = authorizer;
135+
public KafkaApisBuilder setAuthorizerPlugin(Optional<Plugin<Authorizer>> authorizerPlugin) {
136+
this.authorizerPlugin = authorizerPlugin;
136137
return this;
137138
}
138139

@@ -219,7 +220,7 @@ public KafkaApis build() {
219220
configRepository,
220221
metadataCache,
221222
metrics,
222-
OptionConverters.toScala(authorizer),
223+
OptionConverters.toScala(authorizerPlugin),
223224
quotas,
224225
fetchManager,
225226
sharePartitionManager,

core/src/main/scala/kafka/server/AclApis.scala

+8-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import kafka.utils.Logging
2222
import org.apache.kafka.common.acl.AclOperation._
2323
import org.apache.kafka.common.acl.AclBinding
2424
import org.apache.kafka.common.errors._
25+
import org.apache.kafka.common.internals.Plugin
2526
import org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult
2627
import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult
2728
import org.apache.kafka.common.message._
@@ -46,7 +47,7 @@ import scala.jdk.OptionConverters.RichOptional
4647
* Logic to handle ACL requests.
4748
*/
4849
class AclApis(authHelper: AuthHelper,
49-
authorizer: Option[Authorizer],
50+
authorizerPlugin: Option[Plugin[Authorizer]],
5051
requestHelper: RequestHandlerHelper,
5152
role: ProcessRole,
5253
config: KafkaConfig) extends Logging {
@@ -61,7 +62,7 @@ class AclApis(authHelper: AuthHelper,
6162
def handleDescribeAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
6263
authHelper.authorizeClusterOperation(request, DESCRIBE)
6364
val describeAclsRequest = request.body[DescribeAclsRequest]
64-
authorizer match {
65+
authorizerPlugin match {
6566
case None =>
6667
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
6768
new DescribeAclsResponse(new DescribeAclsResponseData()
@@ -74,7 +75,7 @@ class AclApis(authHelper: AuthHelper,
7475
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
7576
new DescribeAclsResponse(new DescribeAclsResponseData()
7677
.setThrottleTimeMs(requestThrottleMs)
77-
.setResources(DescribeAclsResponse.aclsResources(auth.acls(filter))),
78+
.setResources(DescribeAclsResponse.aclsResources(auth.get.acls(filter))),
7879
describeAclsRequest.version))
7980
}
8081
CompletableFuture.completedFuture[Unit](())
@@ -84,7 +85,7 @@ class AclApis(authHelper: AuthHelper,
8485
authHelper.authorizeClusterOperation(request, ALTER)
8586
val createAclsRequest = request.body[CreateAclsRequest]
8687

87-
authorizer match {
88+
authorizerPlugin match {
8889
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
8990
createAclsRequest.getErrorResponse(requestThrottleMs,
9091
new SecurityDisabledException("No Authorizer is configured.")))
@@ -109,7 +110,7 @@ class AclApis(authHelper: AuthHelper,
109110
}
110111

111112
val future = new CompletableFuture[util.List[AclCreationResult]]()
112-
val createResults = auth.createAcls(request.context, validBindings.asJava).stream().map(_.toCompletableFuture).toList
113+
val createResults = auth.get.createAcls(request.context, validBindings.asJava).stream().map(_.toCompletableFuture).toList
113114

114115
def sendResponseCallback(): Unit = {
115116
val aclCreationResults = allBindings.map { acl =>
@@ -139,7 +140,7 @@ class AclApis(authHelper: AuthHelper,
139140
def handleDeleteAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
140141
authHelper.authorizeClusterOperation(request, ALTER)
141142
val deleteAclsRequest = request.body[DeleteAclsRequest]
142-
authorizer match {
143+
authorizerPlugin match {
143144
case None =>
144145
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
145146
deleteAclsRequest.getErrorResponse(requestThrottleMs,
@@ -148,7 +149,7 @@ class AclApis(authHelper: AuthHelper,
148149
case Some(auth) =>
149150

150151
val future = new CompletableFuture[util.List[DeleteAclsFilterResult]]()
151-
val deleteResults = auth.deleteAcls(request.context, deleteAclsRequest.filters)
152+
val deleteResults = auth.get.deleteAcls(request.context, deleteAclsRequest.filters)
152153
.stream().map(_.toCompletableFuture).toList
153154

154155
def sendResponseCallback(): Unit = {

core/src/main/scala/kafka/server/AuthHelper.scala

+6-5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.EndpointType
2424
import org.apache.kafka.common.acl.AclOperation
2525
import org.apache.kafka.common.acl.AclOperation.DESCRIBE
2626
import org.apache.kafka.common.errors.ClusterAuthorizationException
27+
import org.apache.kafka.common.internals.Plugin
2728
import org.apache.kafka.common.message.DescribeClusterResponseData
2829
import org.apache.kafka.common.message.DescribeClusterResponseData.DescribeClusterBrokerCollection
2930
import org.apache.kafka.common.protocol.Errors
@@ -38,7 +39,7 @@ import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authoriz
3839
import scala.collection.Seq
3940
import scala.jdk.CollectionConverters._
4041

41-
class AuthHelper(authorizer: Option[Authorizer]) {
42+
class AuthHelper(authorizer: Option[Plugin[Authorizer]]) {
4243
def authorize(requestContext: RequestContext,
4344
operation: AclOperation,
4445
resourceType: ResourceType,
@@ -49,7 +50,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
4950
authorizer.forall { authZ =>
5051
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
5152
val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
52-
authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
53+
authZ.get.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
5354
}
5455
}
5556

@@ -64,7 +65,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
6465
case Some(authZ) =>
6566
val resourcePattern = new ResourcePattern(resource.resourceType, resource.name, PatternType.LITERAL)
6667
val actions = supportedOps.map { op => new Action(op, resourcePattern, 1, false, false) }
67-
authZ.authorize(request.context, actions.asJava).asScala
68+
authZ.get.authorize(request.context, actions.asJava).asScala
6869
.zip(supportedOps)
6970
.filter(_._1 == AuthorizationResult.ALLOWED)
7071
.map(_._2).toSet
@@ -77,7 +78,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
7778
def authorizeByResourceType(requestContext: RequestContext, operation: AclOperation,
7879
resourceType: ResourceType): Boolean = {
7980
authorizer.forall { authZ =>
80-
authZ.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED
81+
authZ.get.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED
8182
}
8283
}
8384

@@ -109,7 +110,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
109110
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
110111
new Action(operation, resource, count, logIfAllowed, logIfDenied)
111112
}.toBuffer
112-
authZ.authorize(requestContext, actions.asJava).asScala
113+
authZ.get.authorize(requestContext, actions.asJava).asScala
113114
.zip(resourceNameToCount.keySet)
114115
.collect { case (authzResult, resourceName) if authzResult == AuthorizationResult.ALLOWED =>
115116
resourceName

core/src/main/scala/kafka/server/BrokerServer.scala

+8-8
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import kafka.server.metadata._
2727
import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl, SharePartitionManager}
2828
import kafka.utils.CoreUtils
2929
import org.apache.kafka.common.config.ConfigException
30+
import org.apache.kafka.common.internals.Plugin
3031
import org.apache.kafka.common.message.ApiMessageType.ListenerType
3132
import org.apache.kafka.common.metrics.Metrics
3233
import org.apache.kafka.common.network.ListenerName
@@ -103,7 +104,7 @@ class BrokerServer(
103104

104105
@volatile var dataPlaneRequestProcessor: KafkaApis = _
105106

106-
var authorizer: Option[Authorizer] = None
107+
var authorizerPlugin: Option[Plugin[Authorizer]] = None
107108
@volatile var socketServer: SocketServer = _
108109
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = _
109110

@@ -412,8 +413,7 @@ class BrokerServer(
412413
)
413414

414415
// Create and initialize an authorizer if one is configured.
415-
authorizer = config.createNewAuthorizer()
416-
authorizer.foreach(_.configure(config.originals))
416+
authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.BrokerRole.toString)
417417

418418
// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
419419
// for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange
@@ -456,7 +456,7 @@ class BrokerServer(
456456
configRepository = metadataCache,
457457
metadataCache = metadataCache,
458458
metrics = metrics,
459-
authorizer = authorizer,
459+
authorizerPlugin = authorizerPlugin,
460460
quotas = quotaManagers,
461461
fetchManager = fetchManager,
462462
sharePartitionManager = sharePartitionManager,
@@ -529,7 +529,7 @@ class BrokerServer(
529529
config.nodeId,
530530
sharedServer.metadataPublishingFaultHandler,
531531
"broker",
532-
authorizer.toJava
532+
authorizerPlugin.toJava
533533
),
534534
sharedServer.initialBrokerMetadataLoadFaultHandler,
535535
sharedServer.metadataPublishingFaultHandler
@@ -586,7 +586,7 @@ class BrokerServer(
586586
// authorizer future is completed.
587587
val endpointReadyFutures = {
588588
val builder = new EndpointReadyFutures.Builder()
589-
builder.build(authorizer.toJava,
589+
builder.build(authorizerPlugin.toJava,
590590
new KafkaAuthorizerServerInfo(
591591
new ClusterResource(clusterId),
592592
config.nodeId,
@@ -645,7 +645,7 @@ class BrokerServer(
645645
.withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
646646
.withGroupConfigManager(groupConfigManager)
647647
.withPersister(persister)
648-
.withAuthorizer(authorizer.toJava)
648+
.withAuthorizerPlugin(authorizerPlugin.toJava)
649649
.build()
650650
}
651651

@@ -765,7 +765,7 @@ class BrokerServer(
765765
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
766766
if (dataPlaneRequestProcessor != null)
767767
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
768-
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))
768+
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer plugin"))
769769

770770
/**
771771
* We must shutdown the scheduler early because otherwise, the scheduler could touch other

0 commit comments

Comments
 (0)