Skip to content

Replace deprecated methods to support Kafka 4.0.0 #2254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
Expand All @@ -42,6 +43,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
Expand Down Expand Up @@ -370,16 +372,22 @@ protected void maybeUpdateTopicConfig() {

protected void maybeIncreaseTopicPartitionCount() {
String cruiseControlMetricsTopic = _metricsTopic.name();

try {
// Retrieve topic partition count to check and update.
TopicDescription topicDescription =
_adminClient.describeTopics(Collections.singletonList(cruiseControlMetricsTopic)).values()
.get(cruiseControlMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
// For compatibility with Kafka 4.0 and beyond we must use new API methods.
Method topicDescriptionMethod = topicNameValuesMethod();

DescribeTopicsResult describeTopicsResult = _adminClient.describeTopics(Collections.singletonList(cruiseControlMetricsTopic));

Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap = (Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod
.invoke(describeTopicsResult);

TopicDescription topicDescription = topicDescriptionMap.get(cruiseControlMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Copy link
Contributor

@kyguy kyguy Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also abstract these the above lines into a getTopicDescription() method to simplify the code across here and the tests even further. If you want to go that route you could separate the code like this:

  /**
   * Retrieves the {@link TopicDescription} for the specified Kafka topic, handling compatibility
   * with Kafka versions 4.0 and above. This method uses reflection to invoke the appropriate method
   * for retrieving topic description information, depending on the Kafka version.
   *
   * @param _adminClient The Kafka {@link AdminClient} used to interact with the Kafka cluster.
   * @param cruiseControlMetricsTopic The name of the Kafka topic for which the description is to be retrieved.
   *
   * @return The {@link TopicDescription} for the specified Kafka topic.
   *
   * @throws KafkaTopicDescriptionException If an error occurs while retrieving the topic description,
   *         or if the topic name retrieval method cannot be found or invoked properly. This includes
   *         exceptions related to reflection (e.g., {@link NoSuchMethodException}), invocation issues,
   *         execution exceptions, timeouts, and interruptions.
   */
  /* test */ static TopicDescription getTopicDescription(AdminClient _adminClient, String cruiseControlMetricsTopic) throws KafkaTopicDescriptionException {
    try {
      // For compatibility with Kafka 4.0 and beyond we must use new API methods.
      Method topicDescriptionMethod = topicNameValuesMethod();
      DescribeTopicsResult describeTopicsResult = _adminClient.describeTopics(Collections.singletonList(cruiseControlMetricsTopic));
      Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap = (Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod
              .invoke(describeTopicsResult);

      return topicDescriptionMap.get(cruiseControlMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }
    catch (InvocationTargetException | IllegalAccessException | ExecutionException | InterruptedException | TimeoutException | NoSuchMethodException e) {
      throw new KafkaTopicDescriptionException(String.format("Unable to retrieve config of Cruise Cruise Control metrics topic {}.", cruiseControlMetricsTopic), e);
    }
  }

Note that if you did this, it would probably be a good idea to create a custom exception class in the cruisecontrol.metricsreporter/exception folder so you don't have to add all the Exception classes to the method signatures where this method is called.

public class KafkaTopicDescriptionException extends Exception {
    public KafkaTopicDescriptionException(String message, Throwable cause) {
        super(message, cause);
    }
}

This way, you can just call getTopicDescription() whenever you need the topic description for a topic and not worry about calling and invoking the right method there as well, it would just be done in the method here in the CruiseControlMetricsReporter class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking into applying this suggestion but there is one more thing -> The call made in CruiseControlMetricsReporter is

      return topicDescriptionMap.get(cruiseControlMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);

while in the test we don't use the get method with CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS parameters

 `topicDescriptionMap.get(TOPIC).get()`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the inclusion of .get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) interfere with the tests? From what I understand, it still returns the TopicDescription object we need.

Even if the inclusion of .get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) did cause any problems in the tests, couldn't we remove it from the getTopicDescription() method itself and append it to the end of the places where we need it?


if (topicDescription.partitions().size() < _metricsTopic.numPartitions()) {
_adminClient.createPartitions(Collections.singletonMap(cruiseControlMetricsTopic,
NewPartitions.increaseTo(_metricsTopic.numPartitions())));
_adminClient.createPartitions(Collections.singletonMap(cruiseControlMetricsTopic, NewPartitions.increaseTo(_metricsTopic.numPartitions())));
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
} catch (InterruptedException | ExecutionException | TimeoutException | InvocationTargetException | IllegalAccessException e) {
LOG.warn("Partition count increase to {} for topic {} failed{}.", _metricsTopic.numPartitions(), cruiseControlMetricsTopic,
(e.getCause() instanceof ReassignmentInProgressException) ? " due to ongoing reassignment" : "", e);
}
Expand Down Expand Up @@ -511,4 +519,41 @@ private void setIfAbsent(Properties props, String key, String value) {
}
}

/**
* Attempts to retrieve the method for mapping topic names to futures from the {@link org.apache.kafka.clients.admin.DescribeTopicsResult} class.
* This method first tries to get the {@code topicNameValues()} method, which is available in Kafka 3.1.0 and later.
* If the method is not found, it falls back to trying to retrieve the {@code values()} method, which is available in Kafka 3.9.0 and earlier.
*
* If neither of these methods is found, a {@link RuntimeException} is thrown.
*
* <p>This method is useful for ensuring compatibility with both older and newer versions of Kafka clients.</p>
*
* @return the {@link Method} object representing the {@code topicNameValues()} or {@code values()} method.
* @throws RuntimeException if neither the {@code values()} nor {@code topicNameValues()} methods are found.
*/
/* test */ static Method topicNameValuesMethod() {
//
Method topicDescriptionMethod = null;
try {
// First we try to get the topicNameValues() method
topicDescriptionMethod = DescribeTopicsResult.class.getMethod("topicNameValues");
} catch (NoSuchMethodException exception) {
LOG.info("Failed to get method topicNameValues() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
}

if (topicDescriptionMethod == null) {
try {
// Second we try to get the values() method
topicDescriptionMethod = DescribeTopicsResult.class.getMethod("values");
} catch (NoSuchMethodException exception) {
LOG.info("Failed to get method values() from DescribeTopicsResult class: ", exception);
}
}

if (topicDescriptionMethod != null) {
return topicDescriptionMethod;
} else {
throw new RuntimeException("Unable to find both values() and topicNameValues() method in the DescribeTopicsResult class ");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.topicNameValuesMethod;
import static org.junit.Assert.assertEquals;

public class CruiseControlMetricsReporterAutoCreateTopicTest extends CCKafkaClientsIntegrationTestHarness {
Expand Down Expand Up @@ -107,7 +112,21 @@ public void testAutoCreateMetricsTopic() throws ExecutionException, InterruptedE
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
AdminClient adminClient = AdminClient.create(props);
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();

// For compatibility with Kafka 4.0 and beyond we must use new API methods.
Method topicDescriptionMethod = topicNameValuesMethod();

Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap;

try {
topicDescriptionMap =
(Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod
.invoke(adminClient.describeTopics(Collections.singleton(TOPIC)));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}

TopicDescription topicDescription = topicDescriptionMap.get(TOPIC).get();
// assert that the metrics topic was created with partitions and replicas as configured for the metrics report auto-creation
assertEquals(1, topicDescription.partitions().size());
assertEquals(1, topicDescription.partitions().get(0).replicas().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCEmbeddedBroker;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -33,6 +35,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
Expand All @@ -44,6 +47,7 @@

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_HOST;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_PORT;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.topicNameValuesMethod;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG;
Expand Down Expand Up @@ -190,7 +194,20 @@ public void testUpdatingMetricsTopicConfig() throws ExecutionException, Interrup
setSecurityConfigs(props, "admin");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
AdminClient adminClient = AdminClient.create(props);
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();

// For compatibility with Kafka 4.0 and beyond we must use new API methods.
Method topicDescriptionMethod = topicNameValuesMethod();

Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap;

try {
topicDescriptionMap = (Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod
.invoke(adminClient.describeTopics(Collections.singleton(TOPIC)));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}

TopicDescription topicDescription = topicDescriptionMap.get(TOPIC).get();
assertEquals(1, topicDescription.partitions().size());
// Shutdown broker
_brokers.get(0).shutdown();
Expand All @@ -204,8 +221,16 @@ public void testUpdatingMetricsTopicConfig() throws ExecutionException, Interrup
broker.startup();
// Wait for broker to boot up
Thread.sleep(5000);

try {
topicDescriptionMap = (Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod
.invoke(adminClient.describeTopics(Collections.singleton(TOPIC)));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}

// Check whether the topic config is updated
topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();
topicDescription = topicDescriptionMap.get(TOPIC).get();
assertEquals(2, topicDescription.partitions().size());
}

Expand Down