Skip to content

Commit 144101a

Browse files
authored
KAFKA-19054: StreamThread exception handling with SHUTDOWN_APPLICATION may trigger a tight loop with MANY logs (#19394)
Under the `SHUTDOWN_APPLICATION` configuration in Kafka Streams, a tight loop in the shutdown process can flood logs with repeated messages. This PR introduces a check to ensure that the shutdown log is emitted only once every 10 seconds, thereby preventing log flooding. Reviewers: PoAn Yang <[email protected]>, Matthias J. Sax <[email protected]>
1 parent 1bb0c9a commit 144101a

File tree

2 files changed

+21
-5
lines changed

2 files changed

+21
-5
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.kafka.common.serialization.Serdes;
2424
import org.apache.kafka.common.serialization.StringDeserializer;
2525
import org.apache.kafka.common.serialization.StringSerializer;
26+
import org.apache.kafka.common.utils.LogCaptureAppender;
27+
import org.apache.kafka.common.utils.MockTime;
2628
import org.apache.kafka.streams.KafkaStreams;
2729
import org.apache.kafka.streams.KeyValue;
2830
import org.apache.kafka.streams.StreamsBuilder;
@@ -39,10 +41,12 @@
3941
import org.apache.kafka.streams.processor.api.Processor;
4042
import org.apache.kafka.streams.processor.api.ProcessorContext;
4143
import org.apache.kafka.streams.processor.api.Record;
44+
import org.apache.kafka.streams.processor.internals.StreamThread;
4245
import org.apache.kafka.streams.state.Stores;
4346
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
4447
import org.apache.kafka.test.TestUtils;
4548

49+
import org.apache.logging.log4j.Level;
4650
import org.junit.jupiter.api.AfterAll;
4751
import org.junit.jupiter.api.AfterEach;
4852
import org.junit.jupiter.api.BeforeAll;
@@ -331,18 +335,23 @@ private void testShutdownApplication(final int numThreads) throws Exception {
331335
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
332336

333337
final Topology topology = builder.build();
334-
335-
try (final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, properties);
336-
final KafkaStreams kafkaStreams2 = new KafkaStreams(topology, properties)) {
338+
final MockTime time = new MockTime(0L);
339+
340+
try (final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, properties, time);
341+
final KafkaStreams kafkaStreams2 = new KafkaStreams(topology, properties, time);
342+
final LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister()) {
337343
kafkaStreams1.setUncaughtExceptionHandler(exception -> SHUTDOWN_APPLICATION);
338344
kafkaStreams2.setUncaughtExceptionHandler(exception -> SHUTDOWN_APPLICATION);
345+
logCaptureAppender.setClassLogger(StreamThread.class, Level.WARN);
339346

340347
startApplicationAndWaitUntilRunning(asList(kafkaStreams1, kafkaStreams2));
341348

342349
produceMessages(NOW, inputTopic, "A");
343350
waitForApplicationState(asList(kafkaStreams1, kafkaStreams2), KafkaStreams.State.ERROR, DEFAULT_DURATION);
344351

345352
assertThat(processorValueCollector.size(), equalTo(1));
353+
assertThat("Shutdown warning log message should be exported exactly once",
354+
logCaptureAppender.getMessages("WARN").stream().filter(msg -> msg.contains("Detected that shutdown was requested")).count(), equalTo(1L));
346355
}
347356
}
348357

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ public boolean isStartingRunningOrPartitionAssigned() {
357357
// These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
358358
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
359359
private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
360+
private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
360361
private final boolean eosEnabled;
361362
private final boolean stateUpdaterEnabled;
362363
private final boolean processingThreadsEnabled;
@@ -1029,8 +1030,14 @@ public void setStreamsUncaughtExceptionHandler(final BiConsumer<Throwable, Boole
10291030

10301031
public void maybeSendShutdown() {
10311032
if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
1032-
log.warn("Detected that shutdown was requested. " +
1033-
"All clients in this app will now begin to shutdown");
1033+
final long now = time.milliseconds();
1034+
final long lastLogged = lastShutdownWarningTimestamp.get();
1035+
if (now - lastLogged >= 10_000L) {
1036+
if (lastShutdownWarningTimestamp.compareAndSet(lastLogged, now)) {
1037+
log.warn("Detected that shutdown was requested. " +
1038+
"All clients in this app will now begin to shutdown");
1039+
}
1040+
}
10341041
mainConsumer.enforceRebalance("Shutdown requested");
10351042
}
10361043
}

0 commit comments

Comments
 (0)