17
17
package kafka .server ;
18
18
19
19
import org .apache .kafka .common .TopicPartition ;
20
+ import org .apache .kafka .common .internals .Plugin ;
20
21
import org .apache .kafka .common .metrics .Metrics ;
21
22
import org .apache .kafka .common .utils .Time ;
23
+ import org .apache .kafka .common .utils .Utils ;
22
24
import org .apache .kafka .server .config .ClientQuotaManagerConfig ;
23
25
import org .apache .kafka .server .config .QuotaConfig ;
24
26
import org .apache .kafka .server .config .ReplicationQuotaManagerConfig ;
25
27
import org .apache .kafka .server .quota .ClientQuotaCallback ;
26
28
import org .apache .kafka .server .quota .QuotaType ;
27
29
30
+ import java .util .Map ;
28
31
import java .util .Optional ;
29
32
30
33
import scala .Option ;
34
+ import scala .jdk .javaapi .OptionConverters ;
31
35
32
36
public class QuotaFactory {
33
37
@@ -56,20 +60,20 @@ public static class QuotaManagers {
56
60
private final ReplicationQuotaManager leader ;
57
61
private final ReplicationQuotaManager follower ;
58
62
private final ReplicationQuotaManager alterLogDirs ;
59
- private final Optional <ClientQuotaCallback > clientQuotaCallback ;
63
+ private final Optional <Plugin < ClientQuotaCallback >> clientQuotaCallbackPlugin ;
60
64
61
65
public QuotaManagers (ClientQuotaManager fetch , ClientQuotaManager produce , ClientRequestQuotaManager request ,
62
66
ControllerMutationQuotaManager controllerMutation , ReplicationQuotaManager leader ,
63
67
ReplicationQuotaManager follower , ReplicationQuotaManager alterLogDirs ,
64
- Optional <ClientQuotaCallback > clientQuotaCallback ) {
68
+ Optional <Plugin < ClientQuotaCallback >> clientQuotaCallbackPlugin ) {
65
69
this .fetch = fetch ;
66
70
this .produce = produce ;
67
71
this .request = request ;
68
72
this .controllerMutation = controllerMutation ;
69
73
this .leader = leader ;
70
74
this .follower = follower ;
71
75
this .alterLogDirs = alterLogDirs ;
72
- this .clientQuotaCallback = clientQuotaCallback ;
76
+ this .clientQuotaCallbackPlugin = clientQuotaCallbackPlugin ;
73
77
}
74
78
75
79
public ClientQuotaManager fetch () {
@@ -100,35 +104,56 @@ public ReplicationQuotaManager alterLogDirs() {
100
104
return alterLogDirs ;
101
105
}
102
106
103
- public Optional <ClientQuotaCallback > clientQuotaCallback () {
104
- return clientQuotaCallback ;
107
+ public Optional <Plugin < ClientQuotaCallback >> clientQuotaCallbackPlugin () {
108
+ return clientQuotaCallbackPlugin ;
105
109
}
106
110
107
111
public void shutdown () {
108
112
fetch .shutdown ();
109
113
produce .shutdown ();
110
114
request .shutdown ();
111
115
controllerMutation .shutdown ();
112
- clientQuotaCallback .ifPresent (ClientQuotaCallback :: close );
116
+ clientQuotaCallbackPlugin .ifPresent (plugin -> Utils . closeQuietly ( plugin , "client quota callback plugin" ) );
113
117
}
114
118
}
115
119
116
- public static QuotaManagers instantiate (KafkaConfig cfg , Metrics metrics , Time time , String threadNamePrefix ) {
117
- ClientQuotaCallback clientQuotaCallback = cfg .getConfiguredInstance (
118
- QuotaConfig .CLIENT_QUOTA_CALLBACK_CLASS_CONFIG , ClientQuotaCallback .class );
120
+ public static QuotaManagers instantiate (
121
+ KafkaConfig cfg ,
122
+ Metrics metrics ,
123
+ Time time ,
124
+ String threadNamePrefix ,
125
+ String role
126
+ ) {
127
+ Optional <Plugin <ClientQuotaCallback >> clientQuotaCallbackPlugin = createClientQuotaCallback (cfg , metrics , role );
128
+ Option <Plugin <ClientQuotaCallback >> clientQuotaCallbackPluginOption = OptionConverters .toScala (clientQuotaCallbackPlugin );
119
129
120
130
return new QuotaManagers (
121
- new ClientQuotaManager (clientConfig (cfg ), metrics , QuotaType .FETCH , time , threadNamePrefix , Option . apply ( clientQuotaCallback ) ),
122
- new ClientQuotaManager (clientConfig (cfg ), metrics , QuotaType .PRODUCE , time , threadNamePrefix , Option . apply ( clientQuotaCallback ) ),
123
- new ClientRequestQuotaManager (clientConfig (cfg ), metrics , time , threadNamePrefix , Optional . ofNullable ( clientQuotaCallback ) ),
124
- new ControllerMutationQuotaManager (clientControllerMutationConfig (cfg ), metrics , time , threadNamePrefix , Option . apply ( clientQuotaCallback ) ),
131
+ new ClientQuotaManager (clientConfig (cfg ), metrics , QuotaType .FETCH , time , threadNamePrefix , clientQuotaCallbackPluginOption ),
132
+ new ClientQuotaManager (clientConfig (cfg ), metrics , QuotaType .PRODUCE , time , threadNamePrefix , clientQuotaCallbackPluginOption ),
133
+ new ClientRequestQuotaManager (clientConfig (cfg ), metrics , time , threadNamePrefix , clientQuotaCallbackPlugin ),
134
+ new ControllerMutationQuotaManager (clientControllerMutationConfig (cfg ), metrics , time , threadNamePrefix , clientQuotaCallbackPluginOption ),
125
135
new ReplicationQuotaManager (replicationConfig (cfg ), metrics , QuotaType .LEADER_REPLICATION , time ),
126
136
new ReplicationQuotaManager (replicationConfig (cfg ), metrics , QuotaType .FOLLOWER_REPLICATION , time ),
127
137
new ReplicationQuotaManager (alterLogDirsReplicationConfig (cfg ), metrics , QuotaType .ALTER_LOG_DIRS_REPLICATION , time ),
128
- Optional . ofNullable ( clientQuotaCallback )
138
+ clientQuotaCallbackPlugin
129
139
);
130
140
}
131
141
142
+ private static Optional <Plugin <ClientQuotaCallback >> createClientQuotaCallback (
143
+ KafkaConfig cfg ,
144
+ Metrics metrics ,
145
+ String role
146
+ ) {
147
+ ClientQuotaCallback clientQuotaCallback = cfg .getConfiguredInstance (
148
+ QuotaConfig .CLIENT_QUOTA_CALLBACK_CLASS_CONFIG , ClientQuotaCallback .class );
149
+ return clientQuotaCallback == null ? Optional .empty () : Optional .of (Plugin .wrapInstance (
150
+ clientQuotaCallback ,
151
+ metrics ,
152
+ QuotaConfig .CLIENT_QUOTA_CALLBACK_CLASS_CONFIG ,
153
+ Map .of ("role" , role )
154
+ ));
155
+ }
156
+
132
157
private static ClientQuotaManagerConfig clientConfig (KafkaConfig cfg ) {
133
158
return new ClientQuotaManagerConfig (
134
159
cfg .quotaConfig ().numQuotaSamples (),
@@ -156,4 +181,4 @@ private static ReplicationQuotaManagerConfig alterLogDirsReplicationConfig(Kafka
156
181
cfg .quotaConfig ().alterLogDirsReplicationQuotaWindowSizeSeconds ()
157
182
);
158
183
}
159
- }
184
+ }
0 commit comments