@@ -64,8 +64,15 @@ object ConfigCommand extends Logging {
64
64
65
65
private val BrokerDefaultEntityName = " "
66
66
val BrokerLoggerConfigType = " broker-loggers"
67
- private val BrokerSupportedConfigTypes = ConfigType .ALL .asScala :+ BrokerLoggerConfigType
67
+ private val BrokerSupportedConfigTypes = ConfigType .values.map(_.value) :+ BrokerLoggerConfigType
68
68
private val DefaultScramIterations = 4096
69
+ private val TopicType = ConfigType .TOPIC .value
70
+ private val ClientMetricsType = ConfigType .CLIENT_METRICS .value
71
+ private val BrokerType = ConfigType .BROKER .value
72
+ private val GroupType = ConfigType .GROUP .value
73
+ private val UserType = ConfigType .USER .value
74
+ private val ClientType = ConfigType .CLIENT .value
75
+ private val IpType = ConfigType .IP .value
69
76
70
77
def main (args : Array [String ]): Unit = {
71
78
try {
@@ -171,12 +178,13 @@ object ConfigCommand extends Logging {
171
178
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
172
179
173
180
entityTypeHead match {
174
- case ConfigType . TOPIC | ConfigType . CLIENT_METRICS | ConfigType . BROKER | ConfigType . GROUP =>
181
+ case TopicType | ClientMetricsType | BrokerType | GroupType =>
175
182
val configResourceType = entityTypeHead match {
176
- case ConfigType .TOPIC => ConfigResource .Type .TOPIC
177
- case ConfigType .CLIENT_METRICS => ConfigResource .Type .CLIENT_METRICS
178
- case ConfigType .BROKER => ConfigResource .Type .BROKER
179
- case ConfigType .GROUP => ConfigResource .Type .GROUP
183
+ case TopicType => ConfigResource .Type .TOPIC
184
+ case ClientMetricsType => ConfigResource .Type .CLIENT_METRICS
185
+ case BrokerType => ConfigResource .Type .BROKER
186
+ case GroupType => ConfigResource .Type .GROUP
187
+ case _ => throw new IllegalArgumentException (s " $entityNameHead is not a valid entity-type. " )
180
188
}
181
189
try {
182
190
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, configResourceType)
@@ -205,29 +213,29 @@ object ConfigCommand extends Logging {
205
213
val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
206
214
adminClient.incrementalAlterConfigs(Map (configResource -> alterEntries).asJava, alterOptions).all().get(60 , TimeUnit .SECONDS )
207
215
208
- case ConfigType . USER | ConfigType . CLIENT =>
216
+ case UserType | ClientType =>
209
217
val hasQuotaConfigsToAdd = configsToBeAdded.keys.exists(QuotaConfig .isClientOrUserQuotaConfig)
210
218
val scramConfigsToAddMap = configsToBeAdded.filter(entry => ScramMechanism .isScram(entry._1))
211
219
val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key => ScramMechanism .isScram(key) || QuotaConfig .isClientOrUserQuotaConfig(key))
212
220
val hasQuotaConfigsToDelete = configsToBeDeleted.exists(QuotaConfig .isClientOrUserQuotaConfig)
213
221
val scramConfigsToDelete = configsToBeDeleted.filter(ScramMechanism .isScram)
214
222
val unknownConfigsToDelete = configsToBeDeleted.filterNot(key => ScramMechanism .isScram(key) || QuotaConfig .isClientOrUserQuotaConfig(key))
215
- if (entityTypeHead == ConfigType . CLIENT || entityTypes.size == 2 ) { // size==2 for case where users is specified first on the command line, before clients
223
+ if (entityTypeHead == ClientType || entityTypes.size == 2 ) { // size==2 for case where users is specified first on the command line, before clients
216
224
// either just a client or both a user and a client
217
225
if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
218
- throw new IllegalArgumentException (s " Only quota configs can be added for ' ${ ConfigType . CLIENT } ' using --bootstrap-server. Unexpected config names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}" )
226
+ throw new IllegalArgumentException (s " Only quota configs can be added for ' $ClientType ' using --bootstrap-server. Unexpected config names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}" )
219
227
if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
220
- throw new IllegalArgumentException (s " Only quota configs can be deleted for ' ${ ConfigType . CLIENT } ' using --bootstrap-server. Unexpected config names: ${unknownConfigsToDelete ++ scramConfigsToDelete}" )
228
+ throw new IllegalArgumentException (s " Only quota configs can be deleted for ' $ClientType ' using --bootstrap-server. Unexpected config names: ${unknownConfigsToDelete ++ scramConfigsToDelete}" )
221
229
} else { // ConfigType.User
222
230
if (unknownConfigsToAdd.nonEmpty)
223
- throw new IllegalArgumentException (s " Only quota and SCRAM credential configs can be added for ' ${ ConfigType . USER } ' using --bootstrap-server. Unexpected config names: $unknownConfigsToAdd" )
231
+ throw new IllegalArgumentException (s " Only quota and SCRAM credential configs can be added for ' $UserType ' using --bootstrap-server. Unexpected config names: $unknownConfigsToAdd" )
224
232
if (unknownConfigsToDelete.nonEmpty)
225
- throw new IllegalArgumentException (s " Only quota and SCRAM credential configs can be deleted for ' ${ ConfigType . USER } ' using --bootstrap-server. Unexpected config names: $unknownConfigsToDelete" )
233
+ throw new IllegalArgumentException (s " Only quota and SCRAM credential configs can be deleted for ' $UserType ' using --bootstrap-server. Unexpected config names: $unknownConfigsToDelete" )
226
234
if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
227
235
if (entityNames.exists(_.isEmpty)) // either --entity-type users --entity-default or --user-defaults
228
236
throw new IllegalArgumentException (" The use of --entity-default or --user-defaults is not allowed with User SCRAM Credentials using --bootstrap-server." )
229
237
if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
230
- throw new IllegalArgumentException (s " Cannot alter both quota and SCRAM credential configs simultaneously for ' ${ ConfigType . USER } ' using --bootstrap-server. " )
238
+ throw new IllegalArgumentException (s " Cannot alter both quota and SCRAM credential configs simultaneously for ' $UserType ' using --bootstrap-server. " )
231
239
}
232
240
}
233
241
@@ -241,10 +249,10 @@ object ConfigCommand extends Logging {
241
249
alterUserScramCredentialConfigs(adminClient, entityNames.head, scramConfigsToAddMap, scramConfigsToDelete)
242
250
}
243
251
244
- case ConfigType . IP =>
252
+ case IpType =>
245
253
val unknownConfigs = (configsToBeAdded.keys ++ configsToBeDeleted).filterNot(key => DynamicConfig .Ip .names.contains(key))
246
254
if (unknownConfigs.nonEmpty)
247
- throw new IllegalArgumentException (s " Only connection quota configs can be added for ' ${ ConfigType . IP } ' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(" ," )}" )
255
+ throw new IllegalArgumentException (s " Only connection quota configs can be added for ' $IpType ' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(" ," )}" )
248
256
alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
249
257
250
258
case _ =>
@@ -290,9 +298,9 @@ object ConfigCommand extends Logging {
290
298
throw new InvalidConfigurationException (s " Invalid config(s): ${invalidConfigs.mkString(" ," )}" )
291
299
292
300
val alterEntityTypes = entityTypes.map {
293
- case ConfigType . USER => ClientQuotaEntity .USER
294
- case ConfigType . CLIENT => ClientQuotaEntity .CLIENT_ID
295
- case ConfigType . IP => ClientQuotaEntity .IP
301
+ case UserType => ClientQuotaEntity .USER
302
+ case ClientType => ClientQuotaEntity .CLIENT_ID
303
+ case IpType => ClientQuotaEntity .IP
296
304
case entType => throw new IllegalArgumentException (s " Unexpected entity type: $entType" )
297
305
}
298
306
val alterEntityNames = entityNames.map(en => if (en.nonEmpty) en else null )
@@ -321,11 +329,11 @@ object ConfigCommand extends Logging {
321
329
val describeAll = opts.options.has(opts.allOpt)
322
330
323
331
entityTypes.head match {
324
- case ConfigType . TOPIC | ConfigType . BROKER | BrokerLoggerConfigType | ConfigType . CLIENT_METRICS | ConfigType . GROUP =>
332
+ case TopicType | BrokerType | BrokerLoggerConfigType | ClientMetricsType | GroupType =>
325
333
describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll)
326
- case ConfigType . USER | ConfigType . CLIENT =>
334
+ case UserType | ClientType =>
327
335
describeClientQuotaAndUserScramCredentialConfigs(adminClient, entityTypes, entityNames)
328
- case ConfigType . IP =>
336
+ case IpType =>
329
337
describeQuotaConfigs(adminClient, entityTypes, entityNames)
330
338
case entityType => throw new IllegalArgumentException (s " Invalid entity type: $entityType" )
331
339
}
@@ -335,13 +343,13 @@ object ConfigCommand extends Logging {
335
343
val entities = entityName
336
344
.map(name => List (name))
337
345
.getOrElse(entityType match {
338
- case ConfigType . TOPIC =>
346
+ case TopicType =>
339
347
adminClient.listTopics(new ListTopicsOptions ().listInternal(true )).names().get().asScala.toSeq
340
- case ConfigType . BROKER | BrokerLoggerConfigType =>
348
+ case BrokerType | BrokerLoggerConfigType =>
341
349
adminClient.describeCluster(new DescribeClusterOptions ()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName
342
- case ConfigType . CLIENT_METRICS =>
350
+ case ClientMetricsType =>
343
351
adminClient.listClientMetricsResources().all().get().asScala.map(_.name).toSeq
344
- case ConfigType . GROUP =>
352
+ case GroupType =>
345
353
adminClient.listConsumerGroups().all.get.asScala.map(_.groupId).toSeq
346
354
case entityType => throw new IllegalArgumentException (s " Invalid entity type: $entityType" )
347
355
})
@@ -385,11 +393,11 @@ object ConfigCommand extends Logging {
385
393
}
386
394
387
395
val (configResourceType, dynamicConfigSource) = entityType match {
388
- case ConfigType . TOPIC =>
396
+ case TopicType =>
389
397
if (entityName.nonEmpty)
390
398
Topic .validate(entityName)
391
399
(ConfigResource .Type .TOPIC , Some (ConfigEntry .ConfigSource .DYNAMIC_TOPIC_CONFIG ))
392
- case ConfigType . BROKER => entityName match {
400
+ case BrokerType => entityName match {
393
401
case BrokerDefaultEntityName =>
394
402
(ConfigResource .Type .BROKER , Some (ConfigEntry .ConfigSource .DYNAMIC_DEFAULT_BROKER_CONFIG ))
395
403
case _ =>
@@ -400,9 +408,9 @@ object ConfigCommand extends Logging {
400
408
if (entityName.nonEmpty)
401
409
validateBrokerId()
402
410
(ConfigResource .Type .BROKER_LOGGER , None )
403
- case ConfigType . CLIENT_METRICS =>
411
+ case ClientMetricsType =>
404
412
(ConfigResource .Type .CLIENT_METRICS , Some (ConfigEntry .ConfigSource .DYNAMIC_CLIENT_METRICS_CONFIG ))
405
- case ConfigType . GROUP =>
413
+ case GroupType =>
406
414
(ConfigResource .Type .GROUP , Some (ConfigEntry .ConfigSource .DYNAMIC_GROUP_CONFIG ))
407
415
case entityType => throw new IllegalArgumentException (s " Invalid entity type: $entityType" )
408
416
}
@@ -451,7 +459,7 @@ object ConfigCommand extends Logging {
451
459
describeQuotaConfigs(adminClient, entityTypes, entityNames)
452
460
// we describe user SCRAM credentials only when we are not describing client information
453
461
// and we are not given either --entity-default or --user-defaults
454
- if (! entityTypes.contains(ConfigType . CLIENT ) && ! entityNames.contains(" " )) {
462
+ if (! entityTypes.contains(ClientType ) && ! entityNames.contains(" " )) {
455
463
val result = adminClient.describeUserScramCredentials(entityNames.asJava)
456
464
result.users.get(30 , TimeUnit .SECONDS ).asScala.foreach(user => {
457
465
try {
@@ -474,9 +482,9 @@ object ConfigCommand extends Logging {
474
482
private def getAllClientQuotasConfigs (adminClient : Admin , entityTypes : List [String ], entityNames : List [String ]) = {
475
483
val components = entityTypes.map(Some (_)).zipAll(entityNames.map(Some (_)), None , None ).map { case (entityTypeOpt, entityNameOpt) =>
476
484
val entityType = entityTypeOpt match {
477
- case Some (ConfigType . USER ) => ClientQuotaEntity .USER
478
- case Some (ConfigType . CLIENT ) => ClientQuotaEntity .CLIENT_ID
479
- case Some (ConfigType . IP ) => ClientQuotaEntity .IP
485
+ case Some (UserType ) => ClientQuotaEntity .USER
486
+ case Some (ClientType ) => ClientQuotaEntity .CLIENT_ID
487
+ case Some (IpType ) => ClientQuotaEntity .IP
480
488
case Some (_) => throw new IllegalArgumentException (s " Unexpected entity type ${entityTypeOpt.get}" )
481
489
case None => throw new IllegalArgumentException (" More entity names specified than entity types" )
482
490
}
@@ -519,14 +527,14 @@ object ConfigCommand extends Logging {
519
527
520
528
private val nl : String = System .lineSeparator()
521
529
val addConfig : OptionSpec [String ] = parser.accepts(" add-config" , " Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
522
- " For entity-type '" + ConfigType . TOPIC + " ': " + LogConfig .configNames.asScala.map(" \t " + _).mkString(nl, nl, nl) +
523
- " For entity-type '" + ConfigType . BROKER + " ': " + DynamicConfig .Broker .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
524
- " For entity-type '" + ConfigType . USER + " ': " + DynamicConfig .User .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
525
- " For entity-type '" + ConfigType . CLIENT + " ': " + DynamicConfig .Client .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
526
- " For entity-type '" + ConfigType . IP + " ': " + DynamicConfig .Ip .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
527
- " For entity-type '" + ConfigType . CLIENT_METRICS + " ': " + DynamicConfig .ClientMetrics .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
528
- " For entity-type '" + ConfigType . GROUP + " ': " + DynamicConfig .Group .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
529
- s " Entity types ' ${ ConfigType . USER } ' and ' ${ ConfigType . CLIENT } ' may be specified together to update config for clients of a specific user. " )
530
+ " For entity-type '" + TopicType + " ': " + LogConfig .configNames.asScala.map(" \t " + _).mkString(nl, nl, nl) +
531
+ " For entity-type '" + BrokerType + " ': " + DynamicConfig .Broker .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
532
+ " For entity-type '" + UserType + " ': " + DynamicConfig .User .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
533
+ " For entity-type '" + ClientType + " ': " + DynamicConfig .Client .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
534
+ " For entity-type '" + IpType + " ': " + DynamicConfig .Ip .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
535
+ " For entity-type '" + ClientMetricsType + " ': " + DynamicConfig .ClientMetrics .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
536
+ " For entity-type '" + GroupType + " ': " + DynamicConfig .Group .names.asScala.toSeq.sorted.map(" \t " + _).mkString(nl, nl, nl) +
537
+ s " Entity types ' $UserType ' and ' $ClientType ' may be specified together to update config for clients of a specific user. " )
530
538
.withRequiredArg
531
539
.ofType(classOf [String ])
532
540
val addConfigFile : OptionSpec [String ] = parser.accepts(" add-config-file" , " Path to a properties file with configs to add. See add-config for a list of valid configurations." )
@@ -566,19 +574,19 @@ object ConfigCommand extends Logging {
566
574
.ofType(classOf [String ])
567
575
options = parser.parse(args : _* )
568
576
569
- private val entityFlags = List ((topic, ConfigType . TOPIC ),
570
- (client, ConfigType . CLIENT ),
571
- (user, ConfigType . USER ),
572
- (broker, ConfigType . BROKER ),
577
+ private val entityFlags = List ((topic, TopicType ),
578
+ (client, ClientType ),
579
+ (user, UserType ),
580
+ (broker, BrokerType ),
573
581
(brokerLogger, BrokerLoggerConfigType ),
574
- (ip, ConfigType . IP ),
575
- (clientMetrics, ConfigType . CLIENT_METRICS ),
576
- (group, ConfigType . GROUP ))
582
+ (ip, IpType ),
583
+ (clientMetrics, ClientMetricsType ),
584
+ (group, GroupType ))
577
585
578
- private val entityDefaultsFlags = List ((clientDefaults, ConfigType . CLIENT ),
579
- (userDefaults, ConfigType . USER ),
580
- (brokerDefaults, ConfigType . BROKER ),
581
- (ipDefaults, ConfigType . IP ))
586
+ private val entityDefaultsFlags = List ((clientDefaults, ClientType ),
587
+ (userDefaults, UserType ),
588
+ (brokerDefaults, BrokerType ),
589
+ (ipDefaults, IpType ))
582
590
583
591
private [admin] def entityTypes : List [String ] = {
584
592
options.valuesOf(entityType).asScala.toList ++
@@ -624,8 +632,8 @@ object ConfigCommand extends Logging {
624
632
)
625
633
if (entityTypeVals.isEmpty)
626
634
throw new IllegalArgumentException (" At least one entity type must be specified" )
627
- else if (entityTypeVals.size > 1 && ! entityTypeVals.toSet.equals(Set (ConfigType . USER , ConfigType . CLIENT )))
628
- throw new IllegalArgumentException (s " Only ' ${ ConfigType . USER } ' and ' ${ ConfigType . CLIENT } ' entity types may be specified together " )
635
+ else if (entityTypeVals.size > 1 && ! entityTypeVals.toSet.equals(Set (UserType , ClientType )))
636
+ throw new IllegalArgumentException (s " Only ' $UserType ' and ' $ClientType ' entity types may be specified together " )
629
637
630
638
if ((options.has(entityName) || options.has(entityType) || options.has(entityDefault)) &&
631
639
(entityFlags ++ entityDefaultsFlags).exists(entity => options.has(entity._1)))
@@ -638,7 +646,7 @@ object ConfigCommand extends Logging {
638
646
(if (options.has(bootstrapControllerOpt)) 1 else 0 )
639
647
if (numConnectOptions > 1 )
640
648
throw new IllegalArgumentException (" Only one of --bootstrap-server or --bootstrap-controller can be specified" )
641
- if (hasEntityName && (entityTypeVals.contains(ConfigType . BROKER ) || entityTypeVals.contains(BrokerLoggerConfigType ))) {
649
+ if (hasEntityName && (entityTypeVals.contains(BrokerType ) || entityTypeVals.contains(BrokerLoggerConfigType ))) {
642
650
Seq (entityName, broker, brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId =>
643
651
try brokerId.toInt catch {
644
652
case _ : NumberFormatException =>
@@ -647,18 +655,18 @@ object ConfigCommand extends Logging {
647
655
}
648
656
}
649
657
650
- if (hasEntityName && entityTypeVals.contains(ConfigType . IP )) {
658
+ if (hasEntityName && entityTypeVals.contains(IpType )) {
651
659
Seq (entityName, ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipEntity =>
652
660
if (! isValidIpEntity(ipEntity))
653
661
throw new IllegalArgumentException (s " The entity name for ${entityTypeVals.head} must be a valid IP or resolvable host, but it is: $ipEntity" )
654
662
}
655
663
}
656
664
657
665
if (options.has(describeOpt)) {
658
- if (! (entityTypeVals.contains(ConfigType . USER ) ||
659
- entityTypeVals.contains(ConfigType . CLIENT ) ||
660
- entityTypeVals.contains(ConfigType . BROKER ) ||
661
- entityTypeVals.contains(ConfigType . IP )) && options.has(entityDefault)) {
666
+ if (! (entityTypeVals.contains(UserType ) ||
667
+ entityTypeVals.contains(ClientType ) ||
668
+ entityTypeVals.contains(BrokerType ) ||
669
+ entityTypeVals.contains(IpType )) && options.has(entityDefault)) {
662
670
throw new IllegalArgumentException (s " --entity-default must not be specified with --describe of ${entityTypeVals.mkString(" ," )}" )
663
671
}
664
672
@@ -667,10 +675,10 @@ object ConfigCommand extends Logging {
667
675
}
668
676
669
677
if (options.has(alterOpt)) {
670
- if (entityTypeVals.contains(ConfigType . USER ) ||
671
- entityTypeVals.contains(ConfigType . CLIENT ) ||
672
- entityTypeVals.contains(ConfigType . BROKER ) ||
673
- entityTypeVals.contains(ConfigType . IP )) {
678
+ if (entityTypeVals.contains(UserType ) ||
679
+ entityTypeVals.contains(ClientType ) ||
680
+ entityTypeVals.contains(BrokerType ) ||
681
+ entityTypeVals.contains(IpType )) {
674
682
if (! hasEntityName && ! hasEntityDefault)
675
683
throw new IllegalArgumentException (" An entity-name or default entity must be specified with --alter of users, clients, brokers or ips" )
676
684
} else if (! hasEntityName)
0 commit comments