Skip to content

Commit ed77397

Browse files
authored
KAFKA-19062: Port changes from KAFKA-18645 to share-consumers (#19335)
Limits waiting when closing a share consumer to request.timeout.ms. Reviewers: Andrew Schofield <[email protected]>
1 parent 4aa8120 commit ed77397

File tree

3 files changed

+22
-3
lines changed

3 files changed

+22
-3
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java

+6
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,12 @@ public void close() {
703703
* If the consumer is unable to complete acknowledgements and gracefully leave the group
704704
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
705705
* used to interrupt close.
706+
* <p>
707+
* The actual maximum wait time is bounded by the {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting, which
708+
* only applies to operations performed with the broker (coordinator-related requests).
709+
* Even if a larger timeout is specified, the consumer will not wait longer than
710+
* {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation.
711+
* Note that the execution time of callbacks (such as {@link AcknowledgementCommitCallback}) do not consume time from the close timeout.
706712
*
707713
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
708714
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,8 @@ private enum AcknowledgementMode {
201201
private final SubscriptionState subscriptions;
202202
private final ConsumerMetadata metadata;
203203
private final Metrics metrics;
204-
private final long defaultApiTimeoutMs;
204+
private final int requestTimeoutMs;
205+
private final int defaultApiTimeoutMs;
205206
private volatile boolean closed = false;
206207
// Init value is needed to avoid NPE in case of exception raised in the constructor
207208
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
@@ -250,6 +251,7 @@ private enum AcknowledgementMode {
250251
this.log = logContext.logger(getClass());
251252

252253
log.debug("Initializing the Kafka share consumer");
254+
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
253255
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
254256
this.time = time;
255257
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
@@ -369,6 +371,7 @@ private enum AcknowledgementMode {
369371
this.currentFetch = ShareFetch.empty();
370372
this.subscriptions = subscriptions;
371373
this.metadata = metadata;
374+
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
372375
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
373376
this.acknowledgementMode = initializeAcknowledgementMode(config, log);
374377
this.fetchBuffer = new ShareFetchBuffer(logContext);
@@ -450,7 +453,8 @@ private enum AcknowledgementMode {
450453
final Metrics metrics,
451454
final SubscriptionState subscriptions,
452455
final ConsumerMetadata metadata,
453-
final long defaultApiTimeoutMs,
456+
final int requestTimeoutMs,
457+
final int defaultApiTimeoutMs,
454458
final String groupId) {
455459
this.log = logContext.logger(getClass());
456460
this.subscriptions = subscriptions;
@@ -464,6 +468,7 @@ private enum AcknowledgementMode {
464468
this.backgroundEventReaper = backgroundEventReaper;
465469
this.metrics = metrics;
466470
this.metadata = metadata;
471+
this.requestTimeoutMs = requestTimeoutMs;
467472
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
468473
this.acknowledgementMode = initializeAcknowledgementMode(null, log);
469474
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
@@ -874,7 +879,7 @@ private void close(final Duration timeout, final boolean swallowException) {
874879
// We are already closing with a timeout, don't allow wake-ups from here on.
875880
wakeupTrigger.disableWakeups();
876881

877-
final Timer closeTimer = time.timer(timeout);
882+
final Timer closeTimer = createTimerForCloseRequests(timeout);
878883
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
879884
closeTimer.update();
880885

@@ -909,6 +914,12 @@ private void close(final Duration timeout, final boolean swallowException) {
909914
}
910915
}
911916

917+
private Timer createTimerForCloseRequests(Duration timeout) {
918+
// this.time could be null if an exception occurs in constructor prior to setting the this.time field
919+
final Time time = (this.time == null) ? Time.SYSTEM : this.time;
920+
return time.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
921+
}
922+
912923
/**
913924
* Prior to closing the network thread, we need to make sure the following operations happen in the right sequence:
914925
* 1. commit pending acknowledgements and close any share sessions

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ private ShareConsumerImpl<String, String> newConsumer(
149149
String clientId
150150
) {
151151
final int defaultApiTimeoutMs = 1000;
152+
final int requestTimeoutMs = 30000;
152153

153154
return new ShareConsumerImpl<>(
154155
new LogContext(),
@@ -164,6 +165,7 @@ private ShareConsumerImpl<String, String> newConsumer(
164165
new Metrics(),
165166
subscriptions,
166167
metadata,
168+
requestTimeoutMs,
167169
defaultApiTimeoutMs,
168170
groupId
169171
);

0 commit comments

Comments
 (0)