Skip to content

Commit 0fa8e4b

Browse files
david-simonakatona84
authored andcommitted
Update usages of Metadata to conform to kafka 3.7 interface
1 parent 22abc28 commit 0fa8e4b

File tree

7 files changed

+15
-2
lines changed

7 files changed

+15
-2
lines changed

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/common/MetadataClient.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,12 @@ public synchronized ClusterAndGeneration refreshMetadata(long timeoutMs) {
104104
}
105105

106106
private void doRefreshMetadata(long timeoutMs) {
107-
int updateVersion = _metadata.requestUpdate();
107+
int updateVersion = _metadata.requestUpdate(true);
108108
long remaining = timeoutMs;
109109
Cluster beforeUpdate = cluster();
110110
boolean isMetadataUpdated = _metadata.updateVersion() > updateVersion;
111111
while (!isMetadataUpdated && remaining > 0) {
112-
_metadata.requestUpdate();
112+
_metadata.requestUpdate(false);
113113
long start = _time.milliseconds();
114114
_networkClient.poll(remaining, start);
115115
remaining -= (_time.milliseconds() - start);

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionUtils.java

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.TimeoutException;
3030
import javax.annotation.Nullable;
31+
import org.apache.kafka.clients.CommonClientConfigs;
3132
import org.apache.kafka.clients.admin.AdminClient;
3233
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
3334
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
@@ -53,6 +54,7 @@ public final class ExecutionUtils {
5354
private static final Logger LOG = LoggerFactory.getLogger(ExecutionUtils.class);
5455
public static final int DEFAULT_RETRY_BACKOFF_BASE = 2;
5556
public static final long METADATA_REFRESH_BACKOFF = 100L;
57+
public static final long METADATA_REFRESH_BACKOFF_MAX = CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MAX_MS;
5658
public static final long METADATA_EXPIRY_MS = Long.MAX_VALUE;
5759
public static final Duration MIN_ISR_CACHE_CLEANER_PERIOD = Duration.ofMinutes(10);
5860
public static final Duration MIN_ISR_CACHE_CLEANER_INITIAL_DELAY = Duration.ofMinutes(0);

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java

+1
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ public Executor(KafkaCruiseControlConfig config,
192192
_metadataClient = metadataClient != null ? metadataClient
193193
: new MetadataClient(config,
194194
new Metadata(ExecutionUtils.METADATA_REFRESH_BACKOFF,
195+
ExecutionUtils.METADATA_REFRESH_BACKOFF_MAX,
195196
ExecutionUtils.METADATA_EXPIRY_MS,
196197
new LogContext(),
197198
new ClusterResourceListeners()),

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java

+2
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public class LoadMonitor {
8282
// Metadata TTL is set based on experience -- i.e. a short TTL with large metadata may cause excessive load on brokers.
8383
private static final long METADATA_TTL = TimeUnit.SECONDS.toMillis(10);
8484
private static final long METADATA_REFRESH_BACKOFF = TimeUnit.SECONDS.toMillis(5);
85+
private static final long METADATA_REFRESH_BACKOFF_MAX = TimeUnit.SECONDS.toMillis(60);
8586
public static final String KAFKA_ADMIN_CLIENT_OBJECT_CONFIG = "kafka.admin.client.object";
8687
// The maximum time allowed to make a state update. If the state value cannot be updated in time it will be invalidated.
8788
// TODO: Make this configurable.
@@ -125,6 +126,7 @@ public LoadMonitor(KafkaCruiseControlConfig config, Time time, MetricRegistry dr
125126
this(config,
126127
new MetadataClient(config,
127128
new Metadata(METADATA_REFRESH_BACKOFF,
129+
METADATA_REFRESH_BACKOFF_MAX,
128130
config.getLong(MonitorConfig.METADATA_MAX_AGE_MS_CONFIG),
129131
new LogContext(),
130132
new ClusterResourceListeners()),

cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/monitor/MonitorUnitTestUtils.java

+3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.Optional;
1717
import java.util.Set;
1818
import java.util.stream.Collectors;
19+
import org.apache.kafka.clients.CommonClientConfigs;
1920
import org.apache.kafka.clients.Metadata;
2021
import org.apache.kafka.common.Cluster;
2122
import org.apache.kafka.common.Node;
@@ -30,6 +31,7 @@
3031

3132
public final class MonitorUnitTestUtils {
3233
public static final long METADATA_REFRESH_BACKOFF = 10L;
34+
public static final long METADATA_REFRESH_BACKOFF_MAX = CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MAX_MS;
3335
public static final long METADATA_EXPIRY_MS = 10L;
3436
public static final Node NODE_0 = new Node(0, "localhost", 100, "rack0");
3537
public static final Node NODE_1 = new Node(1, "localhost", 100, "rack1");
@@ -69,6 +71,7 @@ public static Metadata getMetadata(Collection<TopicPartition> partitions) {
6971
}
7072

7173
Metadata metadata = new Metadata(METADATA_REFRESH_BACKOFF,
74+
METADATA_REFRESH_BACKOFF_MAX,
7275
METADATA_EXPIRY_MS,
7376
new LogContext(),
7477
new ClusterResourceListeners());

cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/DefaultMetricSamplerPartitionAssignorTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Set;
2828

2929
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_REFRESH_BACKOFF;
30+
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_REFRESH_BACKOFF_MAX;
3031
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_EXPIRY_MS;
3132
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.NODE_0;
3233
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.nodes;
@@ -62,6 +63,7 @@ public void testAssignment() {
6263
}
6364
Cluster cluster = new Cluster("cluster", Arrays.asList(nodes()), partitions, Collections.emptySet(), Collections.emptySet());
6465
Metadata metadata = new Metadata(METADATA_REFRESH_BACKOFF,
66+
METADATA_REFRESH_BACKOFF_MAX,
6567
METADATA_EXPIRY_MS,
6668
new LogContext(),
6769
new ClusterResourceListeners());

cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/monitor/task/LoadMonitorTaskRunnerTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_EXPIRY_MS;
5252
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_REFRESH_BACKOFF;
53+
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUnitTestUtils.METADATA_REFRESH_BACKOFF_MAX;
5354
import static org.junit.Assert.assertEquals;
5455
import static org.junit.Assert.assertTrue;
5556
import static org.junit.Assert.assertNotNull;
@@ -106,6 +107,7 @@ public int clusterSize() {
106107
public void testSimpleFetch() throws InterruptedException {
107108
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getLoadMonitorProperties());
108109
Metadata metadata = new Metadata(METADATA_REFRESH_BACKOFF,
110+
METADATA_REFRESH_BACKOFF_MAX,
109111
METADATA_EXPIRY_MS,
110112
new LogContext(),
111113
new ClusterResourceListeners());
@@ -156,6 +158,7 @@ public void testSimpleFetch() throws InterruptedException {
156158
public void testSamplingError() {
157159
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getLoadMonitorProperties());
158160
Metadata metadata = new Metadata(METADATA_REFRESH_BACKOFF,
161+
METADATA_REFRESH_BACKOFF_MAX,
159162
METADATA_EXPIRY_MS,
160163
new LogContext(),
161164
new ClusterResourceListeners());

0 commit comments

Comments
 (0)