Skip to content

Commit 70e51ec

Browse files
authored
Fix ignored broker failure anomalies when self healing is disabled (#2270)
1 parent eeb0123 commit 70e51ec

File tree

6 files changed

+146
-28
lines changed

6 files changed

+146
-28
lines changed

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AbstractBrokerFailureDetector.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public abstract class AbstractBrokerFailureDetector extends AbstractAnomalyDetec
3939
public static final String FAILED_BROKERS_OBJECT_CONFIG = "failed.brokers.object";
4040
// Config to indicate whether detected broker failures are fixable or not.
4141
public static final String BROKER_FAILURES_FIXABLE_CONFIG = "broker.failures.fixable.object";
42+
public static final String BROKER_FAILURE_CHECK_WITH_DELAY_RETRY_COUNT = "broker.failure.check.with.delay.retry.count";
4243

4344
protected final Map<Integer, Long> _failedBrokers;
4445
protected final File _failedBrokersFile;
@@ -62,6 +63,16 @@ public abstract class AbstractBrokerFailureDetector extends AbstractAnomalyDetec
6263
* @param skipReportingIfNotUpdated {@code true} if broker failure reporting will be skipped if failed brokers have not changed.
6364
*/
6465
synchronized void detectBrokerFailures(boolean skipReportingIfNotUpdated) {
66+
detectBrokerFailures(skipReportingIfNotUpdated, 0);
67+
}
68+
69+
/**
70+
* Detect broker failures. Skip reporting if the failed brokers have not changed and skipReportingIfNotUpdated is true.
71+
*
72+
* @param skipReportingIfNotUpdated {@code true} if broker failure reporting will be skipped if failed brokers have not changed.
73+
* @param brokerFailureCheckWithDelayRetryCount {@code true} to maintain the brokerFailureCheckWithDelayRetryCount
74+
*/
75+
synchronized void detectBrokerFailures(boolean skipReportingIfNotUpdated, int brokerFailureCheckWithDelayRetryCount) {
6576
try {
6677
_aliveBrokers = aliveBrokers();
6778

@@ -73,7 +84,8 @@ synchronized void detectBrokerFailures(boolean skipReportingIfNotUpdated) {
7384
}
7485
if (!skipReportingIfNotUpdated || updated) {
7586
// Report the failures to anomaly detector to handle.
76-
reportBrokerFailures();
87+
// Pass the current retry count to the broker failure.
88+
reportBrokerFailures(brokerFailureCheckWithDelayRetryCount);
7789
}
7890
} catch (Throwable e) {
7991
LOG.warn("Broker failure detector received exception: ", e);
@@ -185,7 +197,7 @@ private boolean tooManyFailedBrokers(int failedBrokerCount, int aliveBrokerCount
185197
|| (double) failedBrokerCount / (failedBrokerCount + aliveBrokerCount) > _fixableFailedBrokerPercentageThreshold;
186198
}
187199

188-
private void reportBrokerFailures() {
200+
private void reportBrokerFailures(int brokerFailureCheckWithDelayRetryCount) {
189201
if (!_failedBrokers.isEmpty()) {
190202
Map<String, Object> parameterConfigOverrides = new HashMap<>();
191203
parameterConfigOverrides.put(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, _kafkaCruiseControl);
@@ -194,6 +206,7 @@ private void reportBrokerFailures() {
194206
parameterConfigOverrides.put(ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, _kafkaCruiseControl.timeMs());
195207
parameterConfigOverrides.put(BROKER_FAILURES_FIXABLE_CONFIG,
196208
!tooManyFailedBrokers(failedBrokers.size(), _aliveBrokers.size()));
209+
parameterConfigOverrides.put(BROKER_FAILURE_CHECK_WITH_DELAY_RETRY_COUNT, brokerFailureCheckWithDelayRetryCount);
197210

198211
BrokerFailures brokerFailures = _kafkaCruiseControl.config().getConfiguredInstance(AnomalyDetectorConfig.BROKER_FAILURES_CLASS_CONFIG,
199212
BrokerFailures.class,

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java

+18-7
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ public void run() {
350350
_anomalyInProgress = null;
351351
try {
352352
_anomalyInProgress = _anomalies.take();
353-
LOG.trace("Processing anomaly {}.", _anomalyInProgress);
353+
LOG.trace("Processing anomaly id: {} {}.", _anomalyInProgress.anomalyId(), _anomalyInProgress);
354354
if (_anomalyInProgress == SHUTDOWN_ANOMALY) {
355355
// Service has shutdown.
356356
_anomalyInProgress = null;
@@ -376,8 +376,8 @@ public void run() {
376376
postProcessAnomalyInProgress = true;
377377
}
378378
if (postProcessAnomalyInProgress) {
379-
LOG.info("Post processing anomaly {}.", _anomalyInProgress);
380-
postProcessAnomalyInProgress(_brokerFailureDetectionBackoffMs);
379+
LOG.info("Post processing anomaly {} id: {}.", _anomalyInProgress, _anomalyInProgress.anomalyId());
380+
postProcessAnomalyInProgress(_brokerFailureDetectionBackoffMs, false);
381381
}
382382
}
383383
LOG.info("Anomaly handler exited.");
@@ -392,7 +392,7 @@ private void handleAnomalyInProgress() throws Exception {
392392
ExecutorState.State executionState = _kafkaCruiseControl.executionState();
393393
if (executionState != ExecutorState.State.NO_TASK_IN_PROGRESS && !_anomalyInProgress.stopOngoingExecution()) {
394394
LOG.info("Post processing anomaly {} because executor is in {} state.", _anomalyInProgress, executionState);
395-
postProcessAnomalyInProgress(_brokerFailureDetectionBackoffMs);
395+
postProcessAnomalyInProgress(_brokerFailureDetectionBackoffMs, false);
396396
} else {
397397
processAnomalyInProgress(anomalyType);
398398
}
@@ -415,7 +415,7 @@ private void processAnomalyInProgress(AnomalyType anomalyType) throws Exception
415415
break;
416416
case CHECK:
417417
LOG.info("Post processing anomaly {} for {}.", _anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
418-
postProcessAnomalyInProgress(notificationResult.delay());
418+
postProcessAnomalyInProgress(notificationResult.delay(), true);
419419
break;
420420
case IGNORE:
421421
_anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, AnomalyState.Status.IGNORED);
@@ -475,8 +475,9 @@ private AnomalyNotificationResult notifyAnomalyInProgress(AnomalyType anomalyTyp
475475
* schedules a broker failure detection after the given delay.
476476
*
477477
* @param delayMs The delay for broker failure detection.
478+
* @param carryForwardRetryCount Retry count is carried forward only in case of check with delays done for the broker failure anomalies
478479
*/
479-
private void postProcessAnomalyInProgress(long delayMs) {
480+
private void postProcessAnomalyInProgress(long delayMs, boolean carryForwardRetryCount) {
480481
// Anomaly detector does delayed check for broker failures, otherwise it ignores the anomaly.
481482
if (_anomalyInProgress.anomalyType() == KafkaAnomalyType.BROKER_FAILURE) {
482483
synchronized (_shutdownLock) {
@@ -485,7 +486,17 @@ private void postProcessAnomalyInProgress(long delayMs) {
485486
} else {
486487
LOG.debug("Scheduling broker failure detection with delay of {} ms", delayMs);
487488
_numCheckedWithDelay.incrementAndGet();
488-
_detectorScheduler.schedule(() -> _brokerFailureDetector.detectBrokerFailures(false), delayMs, TimeUnit.MILLISECONDS);
489+
// By default, the retry count is 0.
490+
int retryCount = 0;
491+
if (carryForwardRetryCount) {
492+
// Only in cases of check with delay, we carry forward the retry count.
493+
BrokerFailures brokerFailures = (BrokerFailures) _anomalyInProgress;
494+
// Carry forward the count of anomaly fix checks done until now
495+
retryCount = brokerFailures.brokerFailureCheckWithDelayRetryCount() + 1;
496+
}
497+
int finalRetryCount = retryCount;
498+
_detectorScheduler.schedule(() -> _brokerFailureDetector.detectBrokerFailures(false,
499+
finalRetryCount), delayMs, TimeUnit.MILLISECONDS);
489500
_anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
490501
}
491502
}

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailures.java

+19
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class BrokerFailures extends KafkaAnomaly {
2929
protected Map<Integer, Long> _failedBrokers;
3030
protected RemoveBrokersRunnable _removeBrokersRunnable;
3131
protected boolean _fixable;
32+
protected int _brokerFailureCheckWithDelayRetryCount;
3233

3334
/**
3435
* An anomaly to indicate broker failure(s).
@@ -54,6 +55,10 @@ public boolean fixable() {
5455
return _fixable;
5556
}
5657

58+
public int brokerFailureCheckWithDelayRetryCount() {
59+
return _brokerFailureCheckWithDelayRetryCount;
60+
}
61+
5762
@Override
5863
public boolean fix() throws KafkaCruiseControlException {
5964
boolean hasProposalsToFix = false;
@@ -90,6 +95,19 @@ public String toString() {
9095
return sb.toString();
9196
}
9297

98+
/**
99+
* Configure the current retry count number for the broker failure check with delay.
100+
* @param configs The configuration map.
101+
*/
102+
protected void configureBrokerFailureCheckWithDelayRetryCount(Map<String, ?> configs) {
103+
if (configs.containsKey(AbstractBrokerFailureDetector.BROKER_FAILURE_CHECK_WITH_DELAY_RETRY_COUNT)) {
104+
_brokerFailureCheckWithDelayRetryCount = (int) configs.get(AbstractBrokerFailureDetector.BROKER_FAILURE_CHECK_WITH_DELAY_RETRY_COUNT);
105+
} else {
106+
// If unset we use the default value as 0.
107+
_brokerFailureCheckWithDelayRetryCount = 0;
108+
}
109+
}
110+
93111
@SuppressWarnings("unchecked")
94112
@Override
95113
public void configure(Map<String, ?> configs) {
@@ -100,6 +118,7 @@ public void configure(Map<String, ?> configs) {
100118
throw new IllegalArgumentException("Missing broker ids for failed brokers anomaly.");
101119
}
102120
_fixable = (Boolean) configs.get(AbstractBrokerFailureDetector.BROKER_FAILURES_FIXABLE_CONFIG);
121+
configureBrokerFailureCheckWithDelayRetryCount(configs);
103122
_optimizationResult = null;
104123
KafkaCruiseControlConfig config = kafkaCruiseControl.config();
105124
boolean allowCapacityEstimation = config.getBoolean(ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG);

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/notifier/SelfHealingNotifier.java

+44-4
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,12 @@ public class SelfHealingNotifier implements AnomalyNotifier {
6666
public static final String SELF_HEALING_TOPIC_ANOMALY_ENABLED_CONFIG = "self.healing.topic.anomaly.enabled";
6767
public static final String SELF_HEALING_MAINTENANCE_EVENT_ENABLED_CONFIG = "self.healing.maintenance.event.enabled";
6868
public static final String BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG = "broker.failure.self.healing.threshold.ms";
69+
public static final String BROKER_FAILURE_CHECK_WITH_DELAY_MAX_RETRY_COUNT = "broker.failure.check.with.delay.max.retry.count";
70+
public static final String BROKER_FAILURE_CHECK_WITH_DELAY_INTERVAL_MS = "broker.failure.check.with.delay.interval.ms";
6971
static final long DEFAULT_ALERT_THRESHOLD_MS = TimeUnit.MINUTES.toMillis(15);
7072
static final long DEFAULT_AUTO_FIX_THRESHOLD_MS = TimeUnit.MINUTES.toMillis(30);
73+
static final int DEFAULT_BROKER_FAILURE_CHECK_WITH_DELAY_MAX_RETRY_COUNT = 10;
74+
static final long DEFAULT_BROKER_FAILURE_CHECK_WITH_DELAY_INTERVAL_MS = TimeUnit.MINUTES.toMillis(10);
7175

7276
private static final Logger LOG = LoggerFactory.getLogger(SelfHealingNotifier.class);
7377
protected final Time _time;
@@ -77,6 +81,8 @@ public class SelfHealingNotifier implements AnomalyNotifier {
7781
protected final Map<AnomalyType, Long> _selfHealingEnabledHistoricalDurationMs;
7882
protected long _brokerFailureAlertThresholdMs;
7983
protected long _selfHealingThresholdMs;
84+
protected int _brokerFailureCheckWithDelayMaxRetryCount;
85+
protected long _brokerFailureCheckWithDelayIntervalMs;
8086
// A cache that keeps the most recent broker failure for each broker.
8187
protected final Map<Boolean, Map<Integer, Long>> _latestFailedBrokersByAutoFixTriggered;
8288

@@ -248,11 +254,36 @@ public AnomalyNotificationResult onBrokerFailure(BrokerFailures brokerFailures)
248254
result = AnomalyNotificationResult.check(delayMs);
249255
} else {
250256
// Reached auto fix threshold. Alert and fix if self healing is enabled and anomaly is fixable.
251-
boolean autoFixTriggered = _selfHealingEnabled.get(KafkaAnomalyType.BROKER_FAILURE) && brokerFailures.fixable();
252-
if (hasNewFailureToAlert(brokerFailures, autoFixTriggered)) {
253-
alert(brokerFailures, autoFixTriggered, selfHealingTimeMs, KafkaAnomalyType.BROKER_FAILURE);
257+
boolean selfHealingEnabled = _selfHealingEnabled.get(KafkaAnomalyType.BROKER_FAILURE);
258+
boolean brokerFailureFixable = brokerFailures.fixable();
259+
boolean autoFixTriggered = selfHealingEnabled && brokerFailures.fixable();
260+
261+
if (!brokerFailureFixable) {
262+
// If broker failure is not fixable then the anomaly can be ignored
263+
result = AnomalyNotificationResult.ignore();
264+
} else if (selfHealingEnabled) {
265+
// If self healing is enabled and broker failure is fixable the fix should be made
266+
if (hasNewFailureToAlert(brokerFailures, autoFixTriggered)) {
267+
alert(brokerFailures, autoFixTriggered, selfHealingTimeMs, KafkaAnomalyType.BROKER_FAILURE);
268+
}
269+
result = AnomalyNotificationResult.fix();
270+
} else {
271+
// In the case self healing is disabled, we keep checking the anomaly until
272+
// we try for _brokerFailureCheckWithDelayMaxRetryCount times
273+
// After we exceed this, depending on the self healing state, we can ignore or fix the anomaly
274+
// This check is to ensure that the broker failure is not ignored.
275+
// The max is so that we do not keep checking forever in case self healing is disabled forever.
276+
if (brokerFailures.brokerFailureCheckWithDelayRetryCount() <= _brokerFailureCheckWithDelayMaxRetryCount) {
277+
// This means that we can retry for checking with delay
278+
if (hasNewFailureToAlert(brokerFailures, autoFixTriggered)) {
279+
alert(brokerFailures, autoFixTriggered, selfHealingTimeMs, KafkaAnomalyType.BROKER_FAILURE);
280+
}
281+
result = AnomalyNotificationResult.check(_brokerFailureCheckWithDelayIntervalMs);
282+
} else {
283+
// This means that we have reached the max retry count and we can ignore the anomaly
284+
result = AnomalyNotificationResult.ignore();
285+
}
254286
}
255-
result = autoFixTriggered ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
256287
}
257288
return result;
258289
}
@@ -280,6 +311,15 @@ public void configure(Map<String, ?> config) {
280311
_brokerFailureAlertThresholdMs = alertThreshold == null ? DEFAULT_ALERT_THRESHOLD_MS : Long.parseLong(alertThreshold);
281312
String fixThreshold = (String) config.get(BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG);
282313
_selfHealingThresholdMs = fixThreshold == null ? DEFAULT_AUTO_FIX_THRESHOLD_MS : Long.parseLong(fixThreshold);
314+
315+
String brokerFailureCheckWithDelayMaxRetryCount = (String) config.get(BROKER_FAILURE_CHECK_WITH_DELAY_MAX_RETRY_COUNT);
316+
_brokerFailureCheckWithDelayMaxRetryCount = brokerFailureCheckWithDelayMaxRetryCount == null
317+
? DEFAULT_BROKER_FAILURE_CHECK_WITH_DELAY_MAX_RETRY_COUNT : Integer.parseInt(brokerFailureCheckWithDelayMaxRetryCount);
318+
319+
String brokerFailureCheckWithDelayIntervalMs = (String) config.get(BROKER_FAILURE_CHECK_WITH_DELAY_INTERVAL_MS);
320+
_brokerFailureCheckWithDelayIntervalMs = brokerFailureCheckWithDelayIntervalMs == null
321+
? DEFAULT_BROKER_FAILURE_CHECK_WITH_DELAY_INTERVAL_MS : Long.parseLong(brokerFailureCheckWithDelayIntervalMs);
322+
283323
if (_brokerFailureAlertThresholdMs > _selfHealingThresholdMs) {
284324
throw new IllegalArgumentException(String.format("The failure detection threshold %d cannot be larger than "
285325
+ "the auto fix threshold. %d",

0 commit comments

Comments
 (0)