Skip to content

Commit f472590

Browse files
authored
Replace deprecated methods to support Kafka 4.0.0 (#2254)
1 parent 9dbf09f commit f472590

File tree

4 files changed

+121
-14
lines changed

4 files changed

+121
-14
lines changed

cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java

+75-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package com.linkedin.kafka.cruisecontrol.metricsreporter;
66

77
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.CruiseControlMetricsReporterException;
8+
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException;
89
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
910
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricsUtils;
1011
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
@@ -33,6 +34,7 @@
3334
import org.apache.kafka.clients.admin.Config;
3435
import org.apache.kafka.clients.admin.CreateTopicsResult;
3536
import org.apache.kafka.clients.admin.DescribeConfigsResult;
37+
import org.apache.kafka.clients.admin.DescribeTopicsResult;
3638
import org.apache.kafka.clients.admin.NewPartitions;
3739
import org.apache.kafka.clients.admin.NewTopic;
3840
import org.apache.kafka.clients.admin.TopicDescription;
@@ -42,6 +44,7 @@
4244
import org.apache.kafka.clients.producer.ProducerRecord;
4345
import org.apache.kafka.clients.producer.RecordMetadata;
4446
import org.apache.kafka.common.KafkaException;
47+
import org.apache.kafka.common.KafkaFuture;
4548
import org.apache.kafka.common.config.ConfigException;
4649
import org.apache.kafka.common.config.ConfigResource;
4750
import org.apache.kafka.common.config.TopicConfig;
@@ -370,16 +373,15 @@ protected void maybeUpdateTopicConfig() {
370373

371374
protected void maybeIncreaseTopicPartitionCount() {
372375
String cruiseControlMetricsTopic = _metricsTopic.name();
376+
373377
try {
374-
// Retrieve topic partition count to check and update.
375-
TopicDescription topicDescription =
376-
_adminClient.describeTopics(Collections.singletonList(cruiseControlMetricsTopic)).values()
377-
.get(cruiseControlMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
378+
// For compatibility with Kafka 4.0 and beyond we must use new API methods.
379+
TopicDescription topicDescription = getTopicDescription(_adminClient, cruiseControlMetricsTopic);
380+
378381
if (topicDescription.partitions().size() < _metricsTopic.numPartitions()) {
379-
_adminClient.createPartitions(Collections.singletonMap(cruiseControlMetricsTopic,
380-
NewPartitions.increaseTo(_metricsTopic.numPartitions())));
382+
_adminClient.createPartitions(Collections.singletonMap(cruiseControlMetricsTopic, NewPartitions.increaseTo(_metricsTopic.numPartitions())));
381383
}
382-
} catch (InterruptedException | ExecutionException | TimeoutException e) {
384+
} catch (KafkaTopicDescriptionException e) {
383385
LOG.warn("Partition count increase to {} for topic {} failed{}.", _metricsTopic.numPartitions(), cruiseControlMetricsTopic,
384386
(e.getCause() instanceof ReassignmentInProgressException) ? " due to ongoing reassignment" : "", e);
385387
}
@@ -511,4 +513,70 @@ private void setIfAbsent(Properties props, String key, String value) {
511513
}
512514
}
513515

516+
/**
517+
* Attempts to retrieve the method for mapping topic names to futures from the {@link org.apache.kafka.clients.admin.DescribeTopicsResult} class.
518+
* This method first tries to get the {@code topicNameValues()} method, which is available in Kafka 3.1.0 and later.
519+
* If the method is not found, it falls back to trying to retrieve the {@code values()} method, which is available in Kafka 3.9.0 and earlier.
520+
*
521+
* If neither of these methods is found, a {@link RuntimeException} is thrown.
522+
*
523+
* <p>This method is useful for ensuring compatibility with both older and newer versions of Kafka clients.</p>
524+
*
525+
* @return the {@link Method} object representing the {@code topicNameValues()} or {@code values()} method.
526+
* @throws RuntimeException if neither the {@code values()} nor {@code topicNameValues()} methods are found.
527+
*/
528+
/* test */ static Method topicNameValuesMethod() {
529+
//
530+
Method topicDescriptionMethod = null;
531+
try {
532+
// First we try to get the topicNameValues() method
533+
topicDescriptionMethod = DescribeTopicsResult.class.getMethod("topicNameValues");
534+
} catch (NoSuchMethodException exception) {
535+
LOG.info("Failed to get method topicNameValues() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
536+
}
537+
538+
if (topicDescriptionMethod == null) {
539+
try {
540+
// Second we try to get the values() method
541+
topicDescriptionMethod = DescribeTopicsResult.class.getMethod("values");
542+
} catch (NoSuchMethodException exception) {
543+
LOG.info("Failed to get method values() from DescribeTopicsResult class: ", exception);
544+
}
545+
}
546+
547+
if (topicDescriptionMethod != null) {
548+
return topicDescriptionMethod;
549+
} else {
550+
throw new RuntimeException("Unable to find both values() and topicNameValues() method in the DescribeTopicsResult class ");
551+
}
552+
}
553+
554+
/**
555+
* Retrieves the {@link TopicDescription} for the specified Kafka topic, handling compatibility
556+
* with Kafka versions 4.0 and above. This method uses reflection to invoke the appropriate method
557+
* for retrieving topic description information, depending on the Kafka version.
558+
*
559+
* @param adminClient The Kafka {@link AdminClient} used to interact with the Kafka cluster.
560+
* @param ccMetricsTopic The name of the Kafka topic for which the description is to be retrieved.
561+
*
562+
* @return The {@link TopicDescription} for the specified Kafka topic.
563+
*
564+
* @throws KafkaTopicDescriptionException If an error occurs while retrieving the topic description,
565+
* or if the topic name retrieval method cannot be found or invoked properly. This includes
566+
* exceptions related to reflection (e.g., {@link NoSuchMethodException}), invocation issues,
567+
* execution exceptions, timeouts, and interruptions.
568+
*/
569+
/* test */ static TopicDescription getTopicDescription(AdminClient adminClient, String ccMetricsTopic) throws KafkaTopicDescriptionException {
570+
try {
571+
// For compatibility with Kafka 4.0 and beyond we must use new API methods.
572+
Method topicDescriptionMethod = topicNameValuesMethod();
573+
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(ccMetricsTopic));
574+
Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap = (Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod
575+
.invoke(describeTopicsResult);
576+
return topicDescriptionMap.get(ccMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
577+
} catch (InvocationTargetException | IllegalAccessException | ExecutionException | InterruptedException | TimeoutException e) {
578+
throw new KafkaTopicDescriptionException(String.format("Unable to retrieve config of Cruise Cruise Control metrics topic {}.",
579+
ccMetricsTopic), e);
580+
}
581+
}
514582
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
3+
*/
4+
5+
package com.linkedin.kafka.cruisecontrol.metricsreporter.exception;
6+
7+
/**
8+
* If an error occurs while retrieving the topic description,
9+
* or if the topic name retrieval method cannot be found or invoked properly. This includes
10+
* exceptions related to reflection (e.g., {@link NoSuchMethodException}), invocation issues,
11+
* execution exceptions, timeouts, and interruptions.
12+
*/
13+
public class KafkaTopicDescriptionException extends Exception {
14+
15+
public KafkaTopicDescriptionException(String message, Throwable cause) {
16+
super(message, cause);
17+
}
18+
}

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterAutoCreateTopicTest.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package com.linkedin.kafka.cruisecontrol.metricsreporter;
66

7+
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException;
78
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
89
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
910
import org.apache.kafka.clients.CommonClientConfigs;
@@ -24,9 +25,9 @@
2425
import org.junit.Test;
2526
import java.util.Collections;
2627
import java.util.Properties;
27-
import java.util.concurrent.ExecutionException;
2828
import java.util.concurrent.atomic.AtomicInteger;
2929

30+
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.getTopicDescription;
3031
import static org.junit.Assert.assertEquals;
3132

3233
public class CruiseControlMetricsReporterAutoCreateTopicTest extends CCKafkaClientsIntegrationTestHarness {
@@ -103,11 +104,18 @@ public Properties overridingProps() {
103104
}
104105

105106
@Test
106-
public void testAutoCreateMetricsTopic() throws ExecutionException, InterruptedException {
107+
public void testAutoCreateMetricsTopic() {
107108
Properties props = new Properties();
108109
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
109110
AdminClient adminClient = AdminClient.create(props);
110-
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();
111+
112+
// For compatibility with Kafka 4.0 and beyond we must use new API methods.
113+
TopicDescription topicDescription;
114+
try {
115+
topicDescription = getTopicDescription(adminClient, TOPIC);
116+
} catch (KafkaTopicDescriptionException e) {
117+
throw new RuntimeException(e);
118+
}
111119
// assert that the metrics topic was created with partitions and replicas as configured for the metrics report auto-creation
112120
assertEquals(1, topicDescription.partitions().size());
113121
assertEquals(1, topicDescription.partitions().get(0).replicas().size());

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package com.linkedin.kafka.cruisecontrol.metricsreporter;
66

7+
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException;
78
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
89
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
910
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCEmbeddedBroker;
@@ -16,7 +17,6 @@
1617
import java.util.Map;
1718
import java.util.Properties;
1819
import java.util.Set;
19-
import java.util.concurrent.ExecutionException;
2020
import java.util.concurrent.atomic.AtomicInteger;
2121
import java.util.regex.Pattern;
2222
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
@@ -44,6 +44,7 @@
4444

4545
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_HOST;
4646
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_PORT;
47+
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.getTopicDescription;
4748
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG;
4849
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG;
4950
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG;
@@ -185,12 +186,19 @@ public void testReportingMetrics() {
185186
}
186187

187188
@Test
188-
public void testUpdatingMetricsTopicConfig() throws ExecutionException, InterruptedException {
189+
public void testUpdatingMetricsTopicConfig() throws InterruptedException {
189190
Properties props = new Properties();
190191
setSecurityConfigs(props, "admin");
191192
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
192193
AdminClient adminClient = AdminClient.create(props);
193-
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();
194+
195+
// For compatibility with Kafka 4.0 and beyond we must use new API methods.
196+
TopicDescription topicDescription;
197+
try {
198+
topicDescription = getTopicDescription(adminClient, TOPIC);
199+
} catch (KafkaTopicDescriptionException e) {
200+
throw new RuntimeException(e);
201+
}
194202
assertEquals(1, topicDescription.partitions().size());
195203
// Shutdown broker
196204
_brokers.get(0).shutdown();
@@ -204,8 +212,13 @@ public void testUpdatingMetricsTopicConfig() throws ExecutionException, Interrup
204212
broker.startup();
205213
// Wait for broker to boot up
206214
Thread.sleep(5000);
215+
207216
// Check whether the topic config is updated
208-
topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();
217+
try {
218+
topicDescription = getTopicDescription(adminClient, TOPIC);
219+
} catch (KafkaTopicDescriptionException e) {
220+
throw new RuntimeException(e);
221+
}
209222
assertEquals(2, topicDescription.partitions().size());
210223
}
211224

0 commit comments

Comments
 (0)