Skip to content

Fix ignored broker failure anomalies when self healing is disabled #2270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public abstract class AbstractBrokerFailureDetector extends AbstractAnomalyDetec
public static final String FAILED_BROKERS_OBJECT_CONFIG = "failed.brokers.object";
// Config to indicate whether detected broker failures are fixable or not.
public static final String BROKER_FAILURES_FIXABLE_CONFIG = "broker.failures.fixable.object";
public static final String BROKER_FAILURE_CHECK_WITH_DELAY_RETRY_COUNT = "broker.failure.check.with.delay.retry.count";

protected final Map<Integer, Long> _failedBrokers;
protected final File _failedBrokersFile;
Expand All @@ -62,6 +63,16 @@ public abstract class AbstractBrokerFailureDetector extends AbstractAnomalyDetec
* @param skipReportingIfNotUpdated {@code true} if broker failure reporting will be skipped if failed brokers have not changed.
*/
synchronized void detectBrokerFailures(boolean skipReportingIfNotUpdated) {
detectBrokerFailures(skipReportingIfNotUpdated, 0);
}

/**
* Detect broker failures. Skip reporting if the failed brokers have not changed and skipReportingIfNotUpdated is true.
*
* @param skipReportingIfNotUpdated {@code true} if broker failure reporting will be skipped if failed brokers have not changed.
* @param brokerFailureCheckWithDelayRetryCount {@code true} to maintain the brokerFailureCheckWithDelayRetryCount
*/
synchronized void detectBrokerFailures(boolean skipReportingIfNotUpdated, int brokerFailureCheckWithDelayRetryCount) {
try {
_aliveBrokers = aliveBrokers();

Expand All @@ -73,7 +84,8 @@ synchronized void detectBrokerFailures(boolean skipReportingIfNotUpdated) {
}
if (!skipReportingIfNotUpdated || updated) {
// Report the failures to anomaly detector to handle.
reportBrokerFailures();
// Pass the current retry count to the broker failure.
reportBrokerFailures(brokerFailureCheckWithDelayRetryCount);
}
} catch (Throwable e) {
LOG.warn("Broker failure detector received exception: ", e);
Expand Down Expand Up @@ -185,7 +197,7 @@ private boolean tooManyFailedBrokers(int failedBrokerCount, int aliveBrokerCount
|| (double) failedBrokerCount / (failedBrokerCount + aliveBrokerCount) > _fixableFailedBrokerPercentageThreshold;
}

private void reportBrokerFailures() {
private void reportBrokerFailures(int brokerFailureCheckWithDelayRetryCount) {
if (!_failedBrokers.isEmpty()) {
Map<String, Object> parameterConfigOverrides = new HashMap<>();
parameterConfigOverrides.put(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, _kafkaCruiseControl);
Expand All @@ -194,6 +206,7 @@ private void reportBrokerFailures() {
parameterConfigOverrides.put(ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, _kafkaCruiseControl.timeMs());
parameterConfigOverrides.put(BROKER_FAILURES_FIXABLE_CONFIG,
!tooManyFailedBrokers(failedBrokers.size(), _aliveBrokers.size()));
parameterConfigOverrides.put(BROKER_FAILURE_CHECK_WITH_DELAY_RETRY_COUNT, brokerFailureCheckWithDelayRetryCount);

BrokerFailures brokerFailures = _kafkaCruiseControl.config().getConfiguredInstance(AnomalyDetectorConfig.BROKER_FAILURES_CLASS_CONFIG,
BrokerFailures.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public void run() {
_anomalyInProgress = null;
try {
_anomalyInProgress = _anomalies.take();
LOG.trace("Processing anomaly {}.", _anomalyInProgress);
LOG.trace("Processing anomaly id: {} {}.", _anomalyInProgress.anomalyId(), _anomalyInProgress);
if (_anomalyInProgress == SHUTDOWN_ANOMALY) {
// Service has shutdown.
_anomalyInProgress = null;
Expand All @@ -376,8 +376,8 @@ public void run() {
postProcessAnomalyInProgress = true;
}
if (postProcessAnomalyInProgress) {
LOG.info("Post processing anomaly {}.", _anomalyInProgress);
postProcessAnomalyInProgress(_brokerFailureDetectionBackoffMs);
LOG.info("Post processing anomaly {} id: {}.", _anomalyInProgress, _anomalyInProgress.anomalyId());
postProcessAnomalyInProgress(_brokerFailureDetectionBackoffMs, false);
}
}
LOG.info("Anomaly handler exited.");
Expand All @@ -392,7 +392,7 @@ private void handleAnomalyInProgress() throws Exception {
ExecutorState.State executionState = _kafkaCruiseControl.executionState();
if (executionState != ExecutorState.State.NO_TASK_IN_PROGRESS && !_anomalyInProgress.stopOngoingExecution()) {
LOG.info("Post processing anomaly {} because executor is in {} state.", _anomalyInProgress, executionState);
postProcessAnomalyInProgress(_brokerFailureDetectionBackoffMs);
postProcessAnomalyInProgress(_brokerFailureDetectionBackoffMs, false);
} else {
processAnomalyInProgress(anomalyType);
}
Expand All @@ -415,7 +415,7 @@ private void processAnomalyInProgress(AnomalyType anomalyType) throws Exception
break;
case CHECK:
LOG.info("Post processing anomaly {} for {}.", _anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
postProcessAnomalyInProgress(notificationResult.delay());
postProcessAnomalyInProgress(notificationResult.delay(), true);
break;
case IGNORE:
_anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, AnomalyState.Status.IGNORED);
Expand Down Expand Up @@ -475,8 +475,9 @@ private AnomalyNotificationResult notifyAnomalyInProgress(AnomalyType anomalyTyp
* schedules a broker failure detection after the given delay.
*
* @param delayMs The delay for broker failure detection.
* @param carryForwardRetryCount Retry count is carried forward only in case of check with delays done for the broker failure anomalies
*/
private void postProcessAnomalyInProgress(long delayMs) {
private void postProcessAnomalyInProgress(long delayMs, boolean carryForwardRetryCount) {
// Anomaly detector does delayed check for broker failures, otherwise it ignores the anomaly.
if (_anomalyInProgress.anomalyType() == KafkaAnomalyType.BROKER_FAILURE) {
synchronized (_shutdownLock) {
Expand All @@ -485,7 +486,17 @@ private void postProcessAnomalyInProgress(long delayMs) {
} else {
LOG.debug("Scheduling broker failure detection with delay of {} ms", delayMs);
_numCheckedWithDelay.incrementAndGet();
_detectorScheduler.schedule(() -> _brokerFailureDetector.detectBrokerFailures(false), delayMs, TimeUnit.MILLISECONDS);
// By default, the retry count is 0.
int retryCount = 0;
if (carryForwardRetryCount) {
// Only in cases of check with delay, we carry forward the retry count.
BrokerFailures brokerFailures = (BrokerFailures) _anomalyInProgress;
// Carry forward the count of anomaly fix checks done until now
retryCount = brokerFailures.brokerFailureCheckWithDelayRetryCount() + 1;
}
int finalRetryCount = retryCount;
_detectorScheduler.schedule(() -> _brokerFailureDetector.detectBrokerFailures(false,
finalRetryCount), delayMs, TimeUnit.MILLISECONDS);
_anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class BrokerFailures extends KafkaAnomaly {
protected Map<Integer, Long> _failedBrokers;
protected RemoveBrokersRunnable _removeBrokersRunnable;
protected boolean _fixable;
protected int _brokerFailureCheckWithDelayRetryCount;

/**
* An anomaly to indicate broker failure(s).
Expand All @@ -54,6 +55,10 @@ public boolean fixable() {
return _fixable;
}

public int brokerFailureCheckWithDelayRetryCount() {
return _brokerFailureCheckWithDelayRetryCount;
}

@Override
public boolean fix() throws KafkaCruiseControlException {
boolean hasProposalsToFix = false;
Expand Down Expand Up @@ -90,6 +95,19 @@ public String toString() {
return sb.toString();
}

/**
* Configure the current retry count number for the broker failure check with delay.
* @param configs The configuration map.
*/
protected void configureBrokerFailureCheckWithDelayRetryCount(Map<String, ?> configs) {
if (configs.containsKey(AbstractBrokerFailureDetector.BROKER_FAILURE_CHECK_WITH_DELAY_RETRY_COUNT)) {
_brokerFailureCheckWithDelayRetryCount = (int) configs.get(AbstractBrokerFailureDetector.BROKER_FAILURE_CHECK_WITH_DELAY_RETRY_COUNT);
} else {
// If unset we use the default value as 0.
_brokerFailureCheckWithDelayRetryCount = 0;
}
}

@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> configs) {
Expand All @@ -100,6 +118,7 @@ public void configure(Map<String, ?> configs) {
throw new IllegalArgumentException("Missing broker ids for failed brokers anomaly.");
}
_fixable = (Boolean) configs.get(AbstractBrokerFailureDetector.BROKER_FAILURES_FIXABLE_CONFIG);
configureBrokerFailureCheckWithDelayRetryCount(configs);
_optimizationResult = null;
KafkaCruiseControlConfig config = kafkaCruiseControl.config();
boolean allowCapacityEstimation = config.getBoolean(ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ public class SelfHealingNotifier implements AnomalyNotifier {
public static final String SELF_HEALING_TOPIC_ANOMALY_ENABLED_CONFIG = "self.healing.topic.anomaly.enabled";
public static final String SELF_HEALING_MAINTENANCE_EVENT_ENABLED_CONFIG = "self.healing.maintenance.event.enabled";
public static final String BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG = "broker.failure.self.healing.threshold.ms";
public static final String BROKER_FAILURE_CHECK_WITH_DELAY_MAX_RETRY_COUNT = "broker.failure.check.with.delay.max.retry.count";
public static final String BROKER_FAILURE_CHECK_WITH_DELAY_INTERVAL_MS = "broker.failure.check.with.delay.interval.ms";
static final long DEFAULT_ALERT_THRESHOLD_MS = TimeUnit.MINUTES.toMillis(15);
static final long DEFAULT_AUTO_FIX_THRESHOLD_MS = TimeUnit.MINUTES.toMillis(30);
static final int DEFAULT_BROKER_FAILURE_CHECK_WITH_DELAY_MAX_RETRY_COUNT = 10;
static final long DEFAULT_BROKER_FAILURE_CHECK_WITH_DELAY_INTERVAL_MS = TimeUnit.MINUTES.toMillis(10);

private static final Logger LOG = LoggerFactory.getLogger(SelfHealingNotifier.class);
protected final Time _time;
Expand All @@ -77,6 +81,8 @@ public class SelfHealingNotifier implements AnomalyNotifier {
protected final Map<AnomalyType, Long> _selfHealingEnabledHistoricalDurationMs;
protected long _brokerFailureAlertThresholdMs;
protected long _selfHealingThresholdMs;
protected int _brokerFailureCheckWithDelayMaxRetryCount;
protected long _brokerFailureCheckWithDelayIntervalMs;
// A cache that keeps the most recent broker failure for each broker.
protected final Map<Boolean, Map<Integer, Long>> _latestFailedBrokersByAutoFixTriggered;

Expand Down Expand Up @@ -248,11 +254,36 @@ public AnomalyNotificationResult onBrokerFailure(BrokerFailures brokerFailures)
result = AnomalyNotificationResult.check(delayMs);
} else {
// Reached auto fix threshold. Alert and fix if self healing is enabled and anomaly is fixable.
boolean autoFixTriggered = _selfHealingEnabled.get(KafkaAnomalyType.BROKER_FAILURE) && brokerFailures.fixable();
if (hasNewFailureToAlert(brokerFailures, autoFixTriggered)) {
alert(brokerFailures, autoFixTriggered, selfHealingTimeMs, KafkaAnomalyType.BROKER_FAILURE);
boolean selfHealingEnabled = _selfHealingEnabled.get(KafkaAnomalyType.BROKER_FAILURE);
boolean brokerFailureFixable = brokerFailures.fixable();
boolean autoFixTriggered = selfHealingEnabled && brokerFailures.fixable();

if (!brokerFailureFixable) {
// If broker failure is not fixable then the anomaly can be ignored
result = AnomalyNotificationResult.ignore();
} else if (selfHealingEnabled) {
// If self healing is enabled and broker failure is fixable the fix should be made
if (hasNewFailureToAlert(brokerFailures, autoFixTriggered)) {
alert(brokerFailures, autoFixTriggered, selfHealingTimeMs, KafkaAnomalyType.BROKER_FAILURE);
}
result = AnomalyNotificationResult.fix();
} else {
// In the case self healing is disabled, we keep checking the anomaly until
// we try for _brokerFailureCheckWithDelayMaxRetryCount times
// After we exceed this, depending on the self healing state, we can ignore or fix the anomaly
// This check is to ensure that the broker failure is not ignored.
// The max is so that we do not keep checking forever in case self healing is disabled forever.
if (brokerFailures.brokerFailureCheckWithDelayRetryCount() <= _brokerFailureCheckWithDelayMaxRetryCount) {
// This means that we can retry for checking with delay
if (hasNewFailureToAlert(brokerFailures, autoFixTriggered)) {
alert(brokerFailures, autoFixTriggered, selfHealingTimeMs, KafkaAnomalyType.BROKER_FAILURE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we are alerting on each retries. Would this cause noisy alerts?

Copy link
Contributor Author

@bsandeep23 bsandeep23 Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasNewFailureToAlert

It is doing this check before alerting, so I am assuming this will alert only if a new broker fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Let's confirm it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed from cc logs.
Disabled self healing, induced a broker failure and see that it is getting checked multiple times but the log in the method alert is being printed only once. code #link

LOG.warn("{} detected {}. Self healing {}.", anomalyType, anomaly,
             _selfHealingEnabled.get(anomalyType) ? String.format("start time %s", utcDateFor(selfHealingStartTime)) : "is disabled");
$ grep "BROKER_FAILURE detected" logs/likafka-cruise-control.log
2025/04/25 13:28:10.135 WARN [SelfHealingNotifier] [AnomalyDetector-2] [kafka-cruise-control] [] BROKER_FAILURE detected {Fixable broker failures detected: {Broker 6228 failed at 2025-04-25T13:13:10Z}}. Self healing is disabled.
app@ltx1-app10469 [ /export/content/lid/apps/likafka-cruise-control/204bdc6796e7b6ce9b55e40ee13d8772a799cf2b ]$
$ ./parse.sh
anomalyId	brokerId	status	detectionTime	statusUpdateTime
b4494501-950d-4e16-87dd-fc432a69cacd	6228	CHECK_WITH_DELAY	2025-04-25 13:13:10	2025-04-25 13:13:10
7a511679-7112-478d-bebd-83f8fc818e7e	6228	CHECK_WITH_DELAY	2025-04-25 13:28:10	2025-04-25 13:28:10
b2c34eef-8f4a-42a6-a485-d8427bc98a5a	6228	CHECK_WITH_DELAY	2025-04-25 13:43:10	2025-04-25 13:43:10
cb033f75-af05-471a-bf97-ebcf1ea3b3db	6228	CHECK_WITH_DELAY	2025-04-25 13:53:10	2025-04-25 13:53:10
sboddu-mn1:~ sboddu$

}
result = AnomalyNotificationResult.check(_brokerFailureCheckWithDelayIntervalMs);
} else {
// This means that we have reached the max retry count and we can ignore the anomaly
result = AnomalyNotificationResult.ignore();
}
}
result = autoFixTriggered ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
}
return result;
}
Expand Down Expand Up @@ -280,6 +311,15 @@ public void configure(Map<String, ?> config) {
_brokerFailureAlertThresholdMs = alertThreshold == null ? DEFAULT_ALERT_THRESHOLD_MS : Long.parseLong(alertThreshold);
String fixThreshold = (String) config.get(BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG);
_selfHealingThresholdMs = fixThreshold == null ? DEFAULT_AUTO_FIX_THRESHOLD_MS : Long.parseLong(fixThreshold);

String brokerFailureCheckWithDelayMaxRetryCount = (String) config.get(BROKER_FAILURE_CHECK_WITH_DELAY_MAX_RETRY_COUNT);
_brokerFailureCheckWithDelayMaxRetryCount = brokerFailureCheckWithDelayMaxRetryCount == null
? DEFAULT_BROKER_FAILURE_CHECK_WITH_DELAY_MAX_RETRY_COUNT : Integer.parseInt(brokerFailureCheckWithDelayMaxRetryCount);

String brokerFailureCheckWithDelayIntervalMs = (String) config.get(BROKER_FAILURE_CHECK_WITH_DELAY_INTERVAL_MS);
_brokerFailureCheckWithDelayIntervalMs = brokerFailureCheckWithDelayIntervalMs == null
? DEFAULT_BROKER_FAILURE_CHECK_WITH_DELAY_INTERVAL_MS : Long.parseLong(brokerFailureCheckWithDelayIntervalMs);

if (_brokerFailureAlertThresholdMs > _selfHealingThresholdMs) {
throw new IllegalArgumentException(String.format("The failure detection threshold %d cannot be larger than "
+ "the auto fix threshold. %d",
Expand Down
Loading
Loading