Skip to content

KAFKA-19062: Port changes from KAFKA-18645 to share-consumers #19335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,12 @@ public void close() {
* If the consumer is unable to complete acknowledgements and gracefully leave the group
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
* used to interrupt close.
* <p>
* The actual maximum wait time is bounded by the {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting, which
* only applies to operations performed with the broker (coordinator-related requests).
* Even if a larger timeout is specified, the consumer will not wait longer than
* {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation.
* Note that the execution time of callbacks (such as {@link AcknowledgementCommitCallback}) do not consume time from the close timeout.
*
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ private enum AcknowledgementMode {
private final SubscriptionState subscriptions;
private final ConsumerMetadata metadata;
private final Metrics metrics;
private final long defaultApiTimeoutMs;
private final int requestTimeoutMs;
private final int defaultApiTimeoutMs;
private volatile boolean closed = false;
// Init value is needed to avoid NPE in case of exception raised in the constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
Expand Down Expand Up @@ -250,6 +251,7 @@ private enum AcknowledgementMode {
this.log = logContext.logger(getClass());

log.debug("Initializing the Kafka share consumer");
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that requestTimeoutMs and defaultApiTimeoutMs ought to have the same data type. Please choose the best one and make them the same, which I have a feeling would be int.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, yeah makes sense. I have updated it to int. Thanks.

this.time = time;
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
Expand Down Expand Up @@ -369,6 +371,7 @@ private enum AcknowledgementMode {
this.currentFetch = ShareFetch.empty();
this.subscriptions = subscriptions;
this.metadata = metadata;
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.acknowledgementMode = initializeAcknowledgementMode(config, log);
this.fetchBuffer = new ShareFetchBuffer(logContext);
Expand Down Expand Up @@ -450,7 +453,8 @@ private enum AcknowledgementMode {
final Metrics metrics,
final SubscriptionState subscriptions,
final ConsumerMetadata metadata,
final long defaultApiTimeoutMs,
final int requestTimeoutMs,
final int defaultApiTimeoutMs,
final String groupId) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
Expand All @@ -464,6 +468,7 @@ private enum AcknowledgementMode {
this.backgroundEventReaper = backgroundEventReaper;
this.metrics = metrics;
this.metadata = metadata;
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.acknowledgementMode = initializeAcknowledgementMode(null, log);
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
Expand Down Expand Up @@ -874,7 +879,7 @@ private void close(final Duration timeout, final boolean swallowException) {
// We are already closing with a timeout, don't allow wake-ups from here on.
wakeupTrigger.disableWakeups();

final Timer closeTimer = time.timer(timeout);
final Timer closeTimer = createTimerForCloseRequests(timeout);
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
closeTimer.update();

Expand Down Expand Up @@ -909,6 +914,12 @@ private void close(final Duration timeout, final boolean swallowException) {
}
}

private Timer createTimerForCloseRequests(Duration timeout) {
// this.time could be null if an exception occurs in constructor prior to setting the this.time field
final Time time = (this.time == null) ? Time.SYSTEM : this.time;
return time.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
}

/**
* Prior to closing the network thread, we need to make sure the following operations happen in the right sequence:
* 1. commit pending acknowledgements and close any share sessions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private ShareConsumerImpl<String, String> newConsumer(
String clientId
) {
final int defaultApiTimeoutMs = 1000;
final int requestTimeoutMs = 30000;

return new ShareConsumerImpl<>(
new LogContext(),
Expand All @@ -164,6 +165,7 @@ private ShareConsumerImpl<String, String> newConsumer(
new Metrics(),
subscriptions,
metadata,
requestTimeoutMs,
defaultApiTimeoutMs,
groupId
);
Expand Down