Skip to content

Commit bdc0868

Browse files
k0b3rITHao Geng
authored and
Hao Geng
committed
Make startup more robust and prevent auto topic creation when using CruiseControlMetricsReporterSampler (#2211)
1 parent 65c278d commit bdc0868

File tree

4 files changed

+54
-9
lines changed

4 files changed

+54
-9
lines changed

cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsUtils.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,11 @@ public static void maybeUpdateConfig(Set<AlterConfigOp> configsToAlter,
163163
* @param scaleMs the scale for computing the delay
164164
* @param base the base for computing the delay
165165
* @param maxAttempts the max number of attempts on calling the function
166+
* @param maxSleepMs the maximum sleep time between retries
166167
* @return {@code false} if the function requires a retry, but it cannot be retried, because the max attempts have been exceeded.
167168
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
168169
*/
169-
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts) {
170+
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts, int maxSleepMs) {
170171
if (maxAttempts > 0) {
171172
int attempts = 0;
172173
long timeToSleep = scaleMs;
@@ -179,6 +180,9 @@ public static boolean retry(Supplier<Boolean> function, long scaleMs, int base,
179180
return false;
180181
}
181182
timeToSleep *= base;
183+
if (maxSleepMs > 0 && timeToSleep > maxSleepMs) {
184+
timeToSleep = maxSleepMs;
185+
}
182186
Thread.sleep(timeToSleep);
183187
} catch (InterruptedException ignored) {
184188

@@ -200,7 +204,21 @@ public static boolean retry(Supplier<Boolean> function, long scaleMs, int base,
200204
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
201205
*/
202206
public static boolean retry(Supplier<Boolean> function, int maxAttempts) {
203-
return retry(function, DEFAULT_RETRY_BACKOFF_SCALE_MS, DEFAULT_RETRY_BACKOFF_BASE, maxAttempts);
207+
return retry(function, DEFAULT_RETRY_BACKOFF_SCALE_MS, DEFAULT_RETRY_BACKOFF_BASE, maxAttempts, -1);
208+
}
209+
210+
/**
211+
* Retries the {@code Supplier<Boolean>} function while it returns {@code true} and for the specified max number of attempts.
212+
* It uses -1 as maxSleepMs, to not limit the sleep time between retries.
213+
* @param function the code to call and retry if needed
214+
* @param scaleMs the scale for computing the delay
215+
* @param base the base for computing the delay
216+
* @param maxAttempts the max number of attempts on calling the function
217+
* @return {@code false} if the function requires a retry, but it cannot be retried, because the max attempts have been exceeded.
218+
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
219+
*/
220+
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts) {
221+
return retry(function, scaleMs, base, maxAttempts, -1);
204222
}
205223

206224
/**

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java

+1
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,7 @@ public static <K, KT extends Deserializer<K>, V, VT extends Deserializer<V>> Con
906906
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
907907
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());
908908
consumerProps.setProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, configs.get(RECONNECT_BACKOFF_MS_CONFIG).toString());
909+
consumerProps.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
909910
return new KafkaConsumer<>(consumerProps);
910911
}
911912

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsReporterSampler.java

+27-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
88
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig;
9+
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsUtils;
910
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
1011
import java.time.Duration;
1112
import java.util.Collections;
@@ -36,6 +37,10 @@ public class CruiseControlMetricsReporterSampler extends AbstractMetricSampler {
3637
// Configurations
3738
public static final String METRIC_REPORTER_SAMPLER_BOOTSTRAP_SERVERS = "metric.reporter.sampler.bootstrap.servers";
3839
public static final String METRIC_REPORTER_TOPIC = "metric.reporter.topic";
40+
public static final String METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS = "metric.reporter.sampler.topic.assert.attempts";
41+
42+
public static final int METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS_DEFAULT = 5;
43+
3944
@Deprecated
4045
public static final String METRIC_REPORTER_SAMPLER_GROUP_ID = "metric.reporter.sampler.group.id";
4146
public static final Duration METRIC_REPORTER_CONSUMER_POLL_TIMEOUT = Duration.ofMillis(5000L);
@@ -151,13 +156,26 @@ protected boolean refreshPartitionAssignment() {
151156
return false;
152157
}
153158

159+
private boolean isMetricsTopicExists() {
160+
Map<String, List<PartitionInfo>> topics = _metricConsumer.listTopics();
161+
if (!topics.containsKey(_metricReporterTopic)) {
162+
return false;
163+
}
164+
return true;
165+
}
166+
154167
@Override
155168
public void configure(Map<String, ?> configs) {
156169
super.configure(configs);
157170
_metricReporterTopic = (String) configs.get(METRIC_REPORTER_TOPIC);
158171
if (_metricReporterTopic == null) {
159172
_metricReporterTopic = CruiseControlMetricsReporterConfig.DEFAULT_CRUISE_CONTROL_METRICS_TOPIC;
160173
}
174+
String metricTopicAssertAttemptsStr = (String) configs.get(METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS);
175+
int metricTopicAssertAttempts = METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS_DEFAULT;
176+
if (metricTopicAssertAttemptsStr != null) {
177+
metricTopicAssertAttempts = Integer.parseInt(metricTopicAssertAttemptsStr);
178+
}
161179
CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);
162180
_acceptableMetricRecordProduceDelayMs = ACCEPTABLE_NETWORK_DELAY_MS
163181
+ Math.max(reporterConfig.getLong(CruiseControlMetricsReporterConfig
@@ -166,9 +184,16 @@ public void configure(Map<String, ?> configs) {
166184
.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG));
167185
_metricConsumer = createMetricConsumer(configs, CONSUMER_CLIENT_ID_PREFIX);
168186
_currentPartitionAssignment = Collections.emptySet();
187+
188+
LOG.info("Waiting for metrics reporter topic [{}] to be available in the Kafka cluster.", _metricReporterTopic);
189+
if (!CruiseControlMetricsUtils.retry(() -> !this.isMetricsTopicExists(), 2000, 2, metricTopicAssertAttempts, 30_000)) {
190+
throw new IllegalStateException("Cruise Control cannot find the metrics reporter topic that matches [" + _metricReporterTopic
191+
+ "] in the Kafka cluster.");
192+
}
193+
169194
if (refreshPartitionAssignment()) {
170-
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches "
171-
+ _metricReporterTopic + " in the target cluster.");
195+
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches ["
196+
+ _metricReporterTopic + "] in the Kafka cluster.");
172197
}
173198
}
174199

docs/wiki/User Guide/Configurations.md

+6-5
Original file line numberDiff line numberDiff line change
@@ -287,11 +287,12 @@ We are still trying to improve cruise control. And following are some configurat
287287
## Configurations of pluggable classes
288288

289289
### CruiseControlMetricsReporterSampler configurations
290-
| Name | Type | Required? | Default Value | Description |
291-
|-------------------------------------------|--------|-----------|------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
292-
| metric.reporter.sampler.bootstrap.servers | String | N | The same as `bootstrap.servers` config from Cruise Control | The Kafka cluster to consume the interested metrics collected by CruiseControlMetricsReporter. |
293-
| metric.reporter.topic | String | N | "__CruiseControlMetrics" | The exact topic name from which the sampler should be consuming the interested metrics from. |
294-
| metric.reporter.sampler.group.id | String | N | 60,000 | The consumer group id to use for the consumers to consume from the Kafka cluster. |
290+
| Name | Type | Required? | Default Value | Description |
291+
|------------------------------------------------|---------|-----------|------------------------------------------------------------|------------------------------------------------------------------------------------------------------|
292+
| metric.reporter.sampler.bootstrap.servers | String | N | The same as `bootstrap.servers` config from Cruise Control | The Kafka cluster to consume the interested metrics collected by CruiseControlMetricsReporter. |
293+
| metric.reporter.topic | String | N | "__CruiseControlMetrics" | The exact topic name from which the sampler should be consuming the interested metrics from. |
294+
| metric.reporter.sampler.group.id | String | N | 60,000 | The consumer group id to use for the consumers to consume from the Kafka cluster. |
295+
| metric.reporter.sampler.topic.assert.attempts | Integer | N | 5 | Number of attempts while waiting for metrics topic to appear in the Kafka cluster during the startup.|
295296

296297
### PrometheusMetricSampler configurations
297298
| Name | Type | Required? | Default Value | Description |

0 commit comments

Comments
 (0)