Skip to content

Replace deprecated methods to support Kafka 4.0.0 #2254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.linkedin.kafka.cruisecontrol.metricsreporter;

import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.CruiseControlMetricsReporterException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricsUtils;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
Expand Down Expand Up @@ -33,6 +34,7 @@
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
Expand All @@ -42,6 +44,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
Expand Down Expand Up @@ -370,16 +373,15 @@ protected void maybeUpdateTopicConfig() {

protected void maybeIncreaseTopicPartitionCount() {
String cruiseControlMetricsTopic = _metricsTopic.name();

try {
// Retrieve topic partition count to check and update.
TopicDescription topicDescription =
_adminClient.describeTopics(Collections.singletonList(cruiseControlMetricsTopic)).values()
.get(cruiseControlMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
// For compatibility with Kafka 4.0 and beyond we must use new API methods.
TopicDescription topicDescription = getTopicDescription(_adminClient, cruiseControlMetricsTopic);

if (topicDescription.partitions().size() < _metricsTopic.numPartitions()) {
_adminClient.createPartitions(Collections.singletonMap(cruiseControlMetricsTopic,
NewPartitions.increaseTo(_metricsTopic.numPartitions())));
_adminClient.createPartitions(Collections.singletonMap(cruiseControlMetricsTopic, NewPartitions.increaseTo(_metricsTopic.numPartitions())));
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
} catch (KafkaTopicDescriptionException e) {
LOG.warn("Partition count increase to {} for topic {} failed{}.", _metricsTopic.numPartitions(), cruiseControlMetricsTopic,
(e.getCause() instanceof ReassignmentInProgressException) ? " due to ongoing reassignment" : "", e);
}
Expand Down Expand Up @@ -511,4 +513,70 @@ private void setIfAbsent(Properties props, String key, String value) {
}
}

/**
* Attempts to retrieve the method for mapping topic names to futures from the {@link org.apache.kafka.clients.admin.DescribeTopicsResult} class.
* This method first tries to get the {@code topicNameValues()} method, which is available in Kafka 3.1.0 and later.
* If the method is not found, it falls back to trying to retrieve the {@code values()} method, which is available in Kafka 3.9.0 and earlier.
*
* If neither of these methods is found, a {@link RuntimeException} is thrown.
*
* <p>This method is useful for ensuring compatibility with both older and newer versions of Kafka clients.</p>
*
* @return the {@link Method} object representing the {@code topicNameValues()} or {@code values()} method.
* @throws RuntimeException if neither the {@code values()} nor {@code topicNameValues()} methods are found.
*/
/* test */ static Method topicNameValuesMethod() {
//
Method topicDescriptionMethod = null;
try {
// First we try to get the topicNameValues() method
topicDescriptionMethod = DescribeTopicsResult.class.getMethod("topicNameValues");
} catch (NoSuchMethodException exception) {
LOG.info("Failed to get method topicNameValues() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
}

if (topicDescriptionMethod == null) {
try {
// Second we try to get the values() method
topicDescriptionMethod = DescribeTopicsResult.class.getMethod("values");
} catch (NoSuchMethodException exception) {
LOG.info("Failed to get method values() from DescribeTopicsResult class: ", exception);
}
}

if (topicDescriptionMethod != null) {
return topicDescriptionMethod;
} else {
throw new RuntimeException("Unable to find both values() and topicNameValues() method in the DescribeTopicsResult class ");
}
}

/**
* Retrieves the {@link TopicDescription} for the specified Kafka topic, handling compatibility
* with Kafka versions 4.0 and above. This method uses reflection to invoke the appropriate method
* for retrieving topic description information, depending on the Kafka version.
*
* @param adminClient The Kafka {@link AdminClient} used to interact with the Kafka cluster.
* @param ccMetricsTopic The name of the Kafka topic for which the description is to be retrieved.
*
* @return The {@link TopicDescription} for the specified Kafka topic.
*
* @throws KafkaTopicDescriptionException If an error occurs while retrieving the topic description,
* or if the topic name retrieval method cannot be found or invoked properly. This includes
* exceptions related to reflection (e.g., {@link NoSuchMethodException}), invocation issues,
* execution exceptions, timeouts, and interruptions.
*/
/* test */ static TopicDescription getTopicDescription(AdminClient adminClient, String ccMetricsTopic) throws KafkaTopicDescriptionException {
try {
// For compatibility with Kafka 4.0 and beyond we must use new API methods.
Method topicDescriptionMethod = topicNameValuesMethod();
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(ccMetricsTopic));
Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap = (Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod
.invoke(describeTopicsResult);
return topicDescriptionMap.get(ccMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InvocationTargetException | IllegalAccessException | ExecutionException | InterruptedException | TimeoutException e) {
throw new KafkaTopicDescriptionException(String.format("Unable to retrieve config of Cruise Cruise Control metrics topic {}.",
ccMetricsTopic), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.metricsreporter.exception;

/**
* If an error occurs while retrieving the topic description,
* or if the topic name retrieval method cannot be found or invoked properly. This includes
* exceptions related to reflection (e.g., {@link NoSuchMethodException}), invocation issues,
* execution exceptions, timeouts, and interruptions.
*/
public class KafkaTopicDescriptionException extends Exception {

public KafkaTopicDescriptionException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package com.linkedin.kafka.cruisecontrol.metricsreporter;

import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import org.apache.kafka.clients.CommonClientConfigs;
Expand All @@ -24,9 +25,9 @@
import org.junit.Test;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.getTopicDescription;
import static org.junit.Assert.assertEquals;

public class CruiseControlMetricsReporterAutoCreateTopicTest extends CCKafkaClientsIntegrationTestHarness {
Expand Down Expand Up @@ -103,11 +104,18 @@ public Properties overridingProps() {
}

@Test
public void testAutoCreateMetricsTopic() throws ExecutionException, InterruptedException {
public void testAutoCreateMetricsTopic() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
AdminClient adminClient = AdminClient.create(props);
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();

// For compatibility with Kafka 4.0 and beyond we must use new API methods.
TopicDescription topicDescription;
try {
topicDescription = getTopicDescription(adminClient, TOPIC);
} catch (KafkaTopicDescriptionException e) {
throw new RuntimeException(e);
}
// assert that the metrics topic was created with partitions and replicas as configured for the metrics report auto-creation
assertEquals(1, topicDescription.partitions().size());
assertEquals(1, topicDescription.partitions().get(0).replicas().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package com.linkedin.kafka.cruisecontrol.metricsreporter;

import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCEmbeddedBroker;
Expand All @@ -16,7 +17,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
Expand Down Expand Up @@ -44,6 +44,7 @@

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_HOST;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_PORT;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.getTopicDescription;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG;
Expand Down Expand Up @@ -185,12 +186,19 @@ public void testReportingMetrics() {
}

@Test
public void testUpdatingMetricsTopicConfig() throws ExecutionException, InterruptedException {
public void testUpdatingMetricsTopicConfig() throws InterruptedException {
Properties props = new Properties();
setSecurityConfigs(props, "admin");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
AdminClient adminClient = AdminClient.create(props);
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();

// For compatibility with Kafka 4.0 and beyond we must use new API methods.
TopicDescription topicDescription;
try {
topicDescription = getTopicDescription(adminClient, TOPIC);
} catch (KafkaTopicDescriptionException e) {
throw new RuntimeException(e);
}
assertEquals(1, topicDescription.partitions().size());
// Shutdown broker
_brokers.get(0).shutdown();
Expand All @@ -204,8 +212,13 @@ public void testUpdatingMetricsTopicConfig() throws ExecutionException, Interrup
broker.startup();
// Wait for broker to boot up
Thread.sleep(5000);

// Check whether the topic config is updated
topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();
try {
topicDescription = getTopicDescription(adminClient, TOPIC);
} catch (KafkaTopicDescriptionException e) {
throw new RuntimeException(e);
}
assertEquals(2, topicDescription.partitions().size());
}

Expand Down
Loading