-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-19062: Port changes from KAFKA-18645 to share-consumers #19335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Just a few small comments.
* <p> | ||
* The actual maximum wait time is bounded by the {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting, which | ||
* only applies to operations performed with the broker (coordinator-related requests and | ||
* fetch sessions). Even if a larger timeout is specified, the consumer will not wait longer than |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "fetch session" is not a thing for a share consumer. Maybe "operations performed with the broker, such as coordinator-related requests." would suffice.
@@ -250,6 +251,7 @@ private enum AcknowledgementMode { | |||
this.log = logContext.logger(getClass()); | |||
|
|||
log.debug("Initializing the Kafka share consumer"); | |||
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); | |||
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that requestTimeoutMs
and defaultApiTimeoutMs
ought to have the same data type. Please choose the best one and make them the same, which I have a feeling would be int
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, yeah makes sense. I have updated it to int
. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ShivsundarR : Thanks for the patch.
LGTM
…#19335) Limits waiting when closing a share consumer to request.timeout.ms. Reviewers: Andrew Schofield <[email protected]>
https://issues.apache.org/jira/browse/KAFKA-19062
After 6636316 , we handle the close timeout in
AsyncKafkaConsumer
by taking the minimum of user given timeout and the default request timeout config.These changes need to applied to ShareConsumers as well.