Skip to content

Replace deprecated methods to support Kafka 4.0.0 #2254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -33,6 +34,7 @@
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
Expand All @@ -42,6 +44,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
Expand Down Expand Up @@ -370,16 +373,41 @@ protected void maybeUpdateTopicConfig() {

protected void maybeIncreaseTopicPartitionCount() {
String cruiseControlMetricsTopic = _metricsTopic.name();

// Starting with Kafka 4.0.0, the deprecated method "values()" class is completely removed from "org.apache.kafka.clients.admin.DescribeTopicsResult"
// so we have to use the new method "topicNameValues".
// To make sure that the internal build pass, this logic is introduced to choose the API based on the Kafka version.
try {
// Retrieve topic partition count to check and update.
TopicDescription topicDescription =
_adminClient.describeTopics(Collections.singletonList(cruiseControlMetricsTopic)).values()
.get(cruiseControlMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (topicDescription.partitions().size() < _metricsTopic.numPartitions()) {
_adminClient.createPartitions(Collections.singletonMap(cruiseControlMetricsTopic,
NewPartitions.increaseTo(_metricsTopic.numPartitions())));
Method topicDescriptionMethod = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth separating out the DescribeTopicsResult method detection into a separate method. This would keep the focus of the maybeIncreaseTopicPartitionCount() method on the original logic.

We could separate the DescribeTopicsResult method detection logic out like this:

/**
   * Attempts to retrieve the method for mapping topic names to futures from the {@link org.apache.kafka.clients.admin.DescribeTopicsResult} class.
   * This method first tries to get the {@code topicNameValues()} method, which is available in Kafka 4.x or later.
   * If the method is not found, it falls back to trying to retrieve the {@code values()} method, which is available in Kafka 3.x or earlier.
   *
   * If neither of these methods is found, a {@link RuntimeException} is thrown.
   *
   * <p>This method is useful for ensuring compatibility with both older and newer versions of Kafka clients.</p>
   *
   * @return the {@link Method} object representing the {@code topicNameValues()} or {@code values()} method.
   * @throws RuntimeException if neither the {@code values()} nor {@code topicNameValues()} methods are found.
   */
  /* test */ static Method topicNameValuesMethod() {
    //
    Method topicDescriptionMethod = null;
    try {
      // First we try to get the topicNameValues() method
      topicDescriptionMethod = Class.forName("org.apache.kafka.clients.admin.DescribeTopicsResult").getMethod("topicNameValues");
    } catch (ClassNotFoundException | NoSuchMethodException exception) {
      LOG.info("Failed to get method topicNameValues() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
    }

    if (topicDescriptionMethod == null) {
      try {
        // Second we try to get the values() method
        topicDescriptionMethod = Class.forName("org.apache.kafka.clients.admin.DescribeTopicsResult").getMethod("values");
      } catch (ClassNotFoundException | NoSuchMethodException exception) {
        LOG.info("Failed to get method values() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
      }
    }

    if (topicDescriptionMethod != null) {
      return topicDescriptionMethod;
    } else {
      throw new RuntimeException("Unable to find both values() and topicNameValues() method in the DescribeTopicsResult class ");
    }
  }

Then call it like this:

Suggested change
Method topicDescriptionMethod = null;
Method topicDescriptionMethod = topicNameValuesMethod();

try {
// First we try to get the topicNameValues() method
topicDescriptionMethod = Class.forName("org.apache.kafka.clients.admin.DescribeTopicsResult").getMethod("topicNameValues");
} catch (ClassNotFoundException | NoSuchMethodException exception) {
LOG.info("Failed to get method topicNameValues() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {

if (topicDescriptionMethod == null) {
try {
// Second we try to get the values() method
topicDescriptionMethod = Class.forName("org.apache.kafka.clients.admin.DescribeTopicsResult").getMethod("values");
} catch (ClassNotFoundException | NoSuchMethodException exception) {
LOG.info("Failed to get method values() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
}
}

if (topicDescriptionMethod != null) {
Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap = (Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod.invoke(_adminClient.describeTopics(Collections.singletonList(cruiseControlMetricsTopic)));

TopicDescription topicDescription = topicDescriptionMap.get(cruiseControlMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);

if (topicDescription.partitions().size() < _metricsTopic.numPartitions()) {
_adminClient.createPartitions(Collections.singletonMap(cruiseControlMetricsTopic,
NewPartitions.increaseTo(_metricsTopic.numPartitions())));
}
} else {
throw new NoSuchElementException("Unable to find both values() and topicNameValues() method in the DescribeTopicsResult class ");
}
} catch (InterruptedException | ExecutionException | TimeoutException | InvocationTargetException | IllegalAccessException e) {
LOG.warn("Partition count increase to {} for topic {} failed{}.", _metricsTopic.numPartitions(), cruiseControlMetricsTopic,
(e.getCause() instanceof ReassignmentInProgressException) ? " due to ongoing reassignment" : "", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -32,6 +40,7 @@
public class CruiseControlMetricsReporterAutoCreateTopicTest extends CCKafkaClientsIntegrationTestHarness {
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
protected static final String TEST_TOPIC = "TestTopic";
private static final Logger LOG = LoggerFactory.getLogger(CruiseControlMetricsReporterAutoCreateTopicTest.class);

/**
* Setup the unit test.
Expand Down Expand Up @@ -107,7 +116,40 @@ public void testAutoCreateMetricsTopic() throws ExecutionException, InterruptedE
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
AdminClient adminClient = AdminClient.create(props);
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();

// Starting with Kafka 4.0.0, the deprecated method "values()" class is completely removed from "org.apache.kafka.clients.admin.DescribeTopicsResult"
// so we have to use the new method "topicNameValues".
// To make sure that the internal build pass, this logic is introduced to choose the API based on the Kafka version
Method topicDescriptionMethod = null;
Copy link
Contributor

@kyguy kyguy Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do the same for the tests, we can separate the DescribeTopicsResult method detection logic into its own method, topicNameValuesMethod() in the CruiseControlMetricRepoter class, then call topicNameValuesMethod() in our tests to save us from the extra lines.

Suggested change
Method topicDescriptionMethod = null;
Method topicDescriptionMethod = topicNameValuesMethod();


try {
// First we try to get the topicNameValues() method
topicDescriptionMethod = Class.forName("org.apache.kafka.clients.admin.DescribeTopicsResult").getMethod("topicNameValues");
} catch (ClassNotFoundException | NoSuchMethodException exception) {
LOG.info("Failed to get method topicNameValues() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
}

if (topicDescriptionMethod == null) {
try {
// Second we try to get the values() method
topicDescriptionMethod = Class.forName("org.apache.kafka.clients.admin.DescribeTopicsResult").getMethod("values");
} catch (ClassNotFoundException | NoSuchMethodException exception) {
LOG.info("Failed to get method values() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
}
}

Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap;
if (topicDescriptionMethod != null) {
try {
topicDescriptionMap = (Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod.invoke(adminClient.describeTopics(Collections.singleton(TOPIC)));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
} else {
throw new NoSuchElementException("Unable to find both values() and topicNameValues() method in the DescribeTopicsResult class");
}

TopicDescription topicDescription = topicDescriptionMap.get(TOPIC).get();
// assert that the metrics topic was created with partitions and replicas as configured for the metrics report auto-creation
assertEquals(1, topicDescription.partitions().size());
assertEquals(1, topicDescription.partitions().get(0).replicas().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,26 @@
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCEmbeddedBroker;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -33,6 +39,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
Expand All @@ -41,19 +48,24 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_HOST;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_PORT;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsUtils.CLIENT_REQUEST_TIMEOUT_MS;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness {
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
private static final String HOST = "127.0.0.1";
private static final Logger LOG = LoggerFactory.getLogger(CruiseControlMetricsReporterAutoCreateTopicTest.class);


/**
* Setup the unit test.
Expand Down Expand Up @@ -190,7 +202,42 @@ public void testUpdatingMetricsTopicConfig() throws ExecutionException, Interrup
setSecurityConfigs(props, "admin");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
AdminClient adminClient = AdminClient.create(props);
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();

// Starting with Kafka 4.0.0, the deprecated method "values()" class is completely removed from "org.apache.kafka.clients.admin.DescribeTopicsResult"
// so we have to use the new method "topicNameValues".
// To make sure that the internal build pass, this logic is introduced to choose the API based on the Kafka version
Method topicDescriptionMethod = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do the same for the tests, we can separate the DescribeTopicsResult method detection logic into its own method, topicNameValuesMethod() in the CruiseControlMetricRepoter class, then call topicNameValuesMethod() in our tests to save us from the extra lines.

Suggested change
Method topicDescriptionMethod = null;
Method topicDescriptionMethod = topicNameValuesMethod();


try {
// First we try to get the topicNameValues() method
topicDescriptionMethod = Class.forName("org.apache.kafka.clients.admin.DescribeTopicsResult").getMethod("topicNameValues");
} catch (ClassNotFoundException | NoSuchMethodException exception) {
LOG.info("Failed to get method topicNameValues() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
}

if (topicDescriptionMethod == null) {
try {
// Second we try to get the values() method
topicDescriptionMethod = Class.forName("org.apache.kafka.clients.admin.DescribeTopicsResult").getMethod("values");
} catch (ClassNotFoundException | NoSuchMethodException exception) {
LOG.info("Failed to get method values() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
}
}

Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap;

if (topicDescriptionMethod != null) {
try {
topicDescriptionMap = (Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod.invoke(adminClient.describeTopics(Collections.singleton(TOPIC)));

} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
} else {
throw new NoSuchElementException("Unable to find both values() and topicNameValues() method in the DescribeTopicsResult class");
}

TopicDescription topicDescription = topicDescriptionMap.get(TOPIC).get();
assertEquals(1, topicDescription.partitions().size());
// Shutdown broker
_brokers.get(0).shutdown();
Expand All @@ -205,7 +252,7 @@ public void testUpdatingMetricsTopicConfig() throws ExecutionException, Interrup
// Wait for broker to boot up
Thread.sleep(5000);
// Check whether the topic config is updated
topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();
topicDescription = topicDescriptionMap.get(TOPIC).get();
assertEquals(2, topicDescription.partitions().size());
}

Expand Down