Skip to content

Commit d497250

Browse files
KAFKA-18999 Remove BrokerMetadata (#19227)
* Replace `BrokerMetadata` with `UsableBroker` in KRaftMetadataCache and ReassignPartitionsCommand. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 68ecb77 commit d497250

File tree

4 files changed

+21
-84
lines changed

4 files changed

+21
-84
lines changed

core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala

+4-10
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package kafka.server.metadata
1919

2020
import kafka.utils.Logging
21-
import org.apache.kafka.admin.BrokerMetadata
2221
import org.apache.kafka.common._
2322
import org.apache.kafka.common.config.ConfigResource
2423
import org.apache.kafka.common.errors.InvalidTopicException
@@ -341,13 +340,6 @@ class KRaftMetadataCache(
341340
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 1
342341
}
343342

344-
private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata] = {
345-
image.cluster().brokers().values().stream()
346-
.filter(Predicate.not(_.fenced))
347-
.map(broker => new BrokerMetadata(broker.id, broker.rack))
348-
.collect(Collectors.toList())
349-
}
350-
351343
override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): util.Optional[Node] = {
352344
util.Optional.ofNullable(_currentImage.cluster().broker(brokerId))
353345
.filter(Predicate.not(_.fenced))
@@ -422,11 +414,13 @@ class KRaftMetadataCache(
422414
}
423415

424416
private def getRandomAliveBroker(image: MetadataImage): util.Optional[Integer] = {
425-
val aliveBrokers = getAliveBrokers(image)
417+
val aliveBrokers = image.cluster().brokers().values().stream()
418+
.filter(Predicate.not(_.fenced))
419+
.map(_.id()).toList
426420
if (aliveBrokers.isEmpty) {
427421
util.Optional.empty()
428422
} else {
429-
util.Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
423+
util.Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size)))
430424
}
431425
}
432426

server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java

-52
This file was deleted.

tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java

+10-15
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.kafka.tools.reassign;
1818

19-
import org.apache.kafka.admin.BrokerMetadata;
2019
import org.apache.kafka.clients.admin.Admin;
2120
import org.apache.kafka.clients.admin.AdminClientConfig;
2221
import org.apache.kafka.clients.admin.AlterConfigOp;
@@ -569,8 +568,8 @@ public static Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List
569568
List<String> topicsToReassign = t0.getValue();
570569

571570
Map<TopicPartition, List<Integer>> currentAssignments = getReplicaAssignmentForTopics(adminClient, topicsToReassign);
572-
List<BrokerMetadata> brokerMetadatas = getBrokerMetadata(adminClient, brokersToReassign, enableRackAwareness);
573-
Map<TopicPartition, List<Integer>> proposedAssignments = calculateAssignment(currentAssignments, brokerMetadatas);
571+
List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient, brokersToReassign, enableRackAwareness);
572+
Map<TopicPartition, List<Integer>> proposedAssignments = calculateAssignment(currentAssignments, usableBrokers);
574573
System.out.printf("Current partition replica assignment%n%s%n%n",
575574
formatAsReassignmentJson(currentAssignments, Collections.emptyMap()));
576575
System.out.printf("Proposed partition reassignment configuration%n%s%n",
@@ -582,12 +581,12 @@ public static Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List
582581
* Calculate the new partition assignments to suggest in --generate.
583582
*
584583
* @param currentAssignment The current partition assignments.
585-
* @param brokerMetadatas The rack information for each broker.
584+
* @param brokers The rack information for each broker.
586585
*
587586
* @return A map from partitions to the proposed assignments for each.
588587
*/
589588
private static Map<TopicPartition, List<Integer>> calculateAssignment(Map<TopicPartition, List<Integer>> currentAssignment,
590-
List<BrokerMetadata> brokerMetadatas) {
589+
List<UsableBroker> usableBrokers) {
591590
Map<String, List<Entry<TopicPartition, List<Integer>>>> groupedByTopic = new HashMap<>();
592591
for (Entry<TopicPartition, List<Integer>> e : currentAssignment.entrySet())
593592
groupedByTopic.computeIfAbsent(e.getKey().topic(), k -> new ArrayList<>()).add(e);
@@ -601,11 +600,7 @@ private static Map<TopicPartition, List<Integer>> calculateAssignment(Map<TopicP
601600
new ClusterDescriber() {
602601
@Override
603602
public Iterator<UsableBroker> usableBrokers() {
604-
return brokerMetadatas.stream().map(brokerMetadata -> new UsableBroker(
605-
brokerMetadata.id,
606-
brokerMetadata.rack,
607-
false
608-
)).iterator();
603+
return usableBrokers.iterator();
609604
}
610605

611606
@Override
@@ -701,16 +696,16 @@ static Map<TopicPartition, List<Integer>> getReplicaAssignmentForPartitions(Admi
701696
* @return The metadata for each broker that was found.
702697
* Brokers that were not found will be omitted.
703698
*/
704-
static List<BrokerMetadata> getBrokerMetadata(Admin adminClient, List<Integer> brokers, boolean enableRackAwareness) throws ExecutionException, InterruptedException {
699+
static List<UsableBroker> getBrokerMetadata(Admin adminClient, List<Integer> brokers, boolean enableRackAwareness) throws ExecutionException, InterruptedException {
705700
Set<Integer> brokerSet = new HashSet<>(brokers);
706-
List<BrokerMetadata> results = adminClient.describeCluster().nodes().get().stream()
701+
List<UsableBroker> results = adminClient.describeCluster().nodes().get().stream()
707702
.filter(node -> brokerSet.contains(node.id()))
708703
.map(node -> (enableRackAwareness && node.rack() != null)
709-
? new BrokerMetadata(node.id(), Optional.of(node.rack()))
710-
: new BrokerMetadata(node.id(), Optional.empty())
704+
? new UsableBroker(node.id(), Optional.of(node.rack()), false)
705+
: new UsableBroker(node.id(), Optional.empty(), false)
711706
).collect(Collectors.toList());
712707

713-
long numRackless = results.stream().filter(m -> m.rack.isEmpty()).count();
708+
long numRackless = results.stream().filter(m -> m.rack().isEmpty()).count();
714709
if (enableRackAwareness && numRackless != 0 && numRackless != results.size()) {
715710
throw new AdminOperationException("Not all brokers have rack information. Add " +
716711
"--disable-rack-aware in command line to make replica assignment without rack " +

tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.kafka.tools.reassign;
1818

19-
import org.apache.kafka.admin.BrokerMetadata;
2019
import org.apache.kafka.clients.admin.Config;
2120
import org.apache.kafka.clients.admin.MockAdminClient;
2221
import org.apache.kafka.clients.admin.PartitionReassignment;
@@ -29,6 +28,7 @@
2928
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
3029
import org.apache.kafka.common.utils.Exit;
3130
import org.apache.kafka.common.utils.Time;
31+
import org.apache.kafka.metadata.placement.UsableBroker;
3232
import org.apache.kafka.server.common.AdminCommandFailedException;
3333
import org.apache.kafka.server.common.AdminOperationException;
3434
import org.apache.kafka.server.config.QuotaConfig;
@@ -317,19 +317,19 @@ public void testGetBrokerRackInformation() throws Exception {
317317
build()) {
318318

319319
assertEquals(asList(
320-
new BrokerMetadata(0, Optional.of("rack0")),
321-
new BrokerMetadata(1, Optional.of("rack1"))
320+
new UsableBroker(0, Optional.of("rack0"), false),
321+
new UsableBroker(1, Optional.of("rack1"), false)
322322
), getBrokerMetadata(adminClient, asList(0, 1), true));
323323
assertEquals(asList(
324-
new BrokerMetadata(0, Optional.empty()),
325-
new BrokerMetadata(1, Optional.empty())
324+
new UsableBroker(0, Optional.empty(), false),
325+
new UsableBroker(1, Optional.empty(), false)
326326
), getBrokerMetadata(adminClient, asList(0, 1), false));
327327
assertStartsWith("Not all brokers have rack information",
328328
assertThrows(AdminOperationException.class,
329329
() -> getBrokerMetadata(adminClient, asList(1, 2), true)).getMessage());
330330
assertEquals(asList(
331-
new BrokerMetadata(1, Optional.empty()),
332-
new BrokerMetadata(2, Optional.empty())
331+
new UsableBroker(1, Optional.empty(), false),
332+
new UsableBroker(2, Optional.empty(), false)
333333
), getBrokerMetadata(adminClient, asList(1, 2), false));
334334
}
335335
}

0 commit comments

Comments
 (0)