-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-16758: Extend Consumer#close with an option to leave the group or not #17614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...nts/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
Outdated
Show resolved
Hide resolved
* | ||
* @see GroupMembershipOperation | ||
*/ | ||
protected GroupMembershipOperation leaveGroupOperation = GroupMembershipOperation.DEFAULT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the architecture of the async consumer, it's difficult to pass GroupMembershipOperation
directly to AbstractMembershipManager
, so I added a property to hold the behavior for leaving the group, which remains unchanged except when the consumer is closing.
...nts/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
Outdated
Show resolved
Hide resolved
e4e6468
to
a196516
Compare
a196516
to
73205b4
Compare
Hello @ableegoldman @mjsax @chia7712 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a quick pass though it would be good if someone more familiar with the consumer internals can take a look as well.
Also, I take it you plan to fix the KafkaStreams#close behavior and remove the internal.leave.group.on.close
flag in a separate followup PR?
* Enum to specify the group membership operation upon leaving group. | ||
* {@code LEAVE_GROUP} means the consumer will leave the group. | ||
* {@code REMAIN_IN_GROUP} means the consumer will remain in the group. | ||
* {@code DEFAULT} applies the default behavior, which may depend on whether the consumer is static or dynamic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we clarify what the actual behavior is for static vs dynamic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I'll include some additional description for it. 😄
// If membershipOperation is DEFAULT, leave group based on rebalanceConfig.leaveGroupOnClose. | ||
// Otherwise, leave group only if membershipOperation is LEAVE_GROUP. | ||
if (GroupMembershipOperation.REMAIN_IN_GROUP != membershipOperation && | ||
(GroupMembershipOperation.LEAVE_GROUP == membershipOperation || rebalanceConfig.leaveGroupOnClose)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the "leaveGroupOnClose" rebalance config is an internal backdoor that we put in for Streams to get around the lack of this API -- so once we update the corresponding KafkaStreams# close method we should remember to remove this config and clean up the code. The secret "internal" config was always a bit hacky so it's nice to finally get rid of it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it's a bit weird to be applying this check in two places. I think we should remove this check entirely and only have the one in #maybeLeaveGroup
(Frankly it was kind of weird to put the leaveGroupOnClose
check here to begin with, imo it should have been part of the #maybeLeaveGroup check to begin with. So I'd just remove the entire if
condition and move the leaveGroupOnClose
config check to the #maybeLeaveGroup
check until we can remove it entirely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I correct in thinking that if we decide to move the if
condition into maybeLeaveGroup
,
we should also move onLeavePrepare
into maybeLeaveGroup
?
If that’s the case, it might be tricky.
Currently, I assume that onLeavePrepare
and maybeLeaveGroup
should be invoked consecutively,
since they share the same if
condition. However, at the moment, there are many places
in both test code and production code that only invoke maybeLeaveGroup
.
If we proceed with this refactor, we will need to ensure that we do not break the current behavior.
Here’s a rough idea:
public synchronized RequestFuture<Void> maybeLeaveGroup(GroupMembershipOperation membershipOperation, String leaveReason) {
// If membershipOperation is REMAIN_IN_GROUP, never send leave group request.
if (GroupMembershipOperation.REMAIN_IN_GROUP == membershipOperation) {
return null;
}
// If membershipOperation is DEFAULT, leave group based on rebalanceConfig.leaveGroupOnClose.
// Otherwise, leave group only if membershipOperation is LEAVE_GROUP.
if (GroupMembershipOperation.LEAVE_GROUP != membershipOperation && !rebalanceConfig.leaveGroupOnClose) {
return null;
}
onLeavePrepare();
return leaveGroup(membershipOperation, leaveReason);
}
public RequestFuture<Void> leaveGroup(GroupMembershipOperation membershipOperation, String leaveReason) {
RequestFuture<Void> future = null;
if (shouldSendLeaveGroupRequest(membershipOperation)) {
log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
generation.memberId, coordinator, leaveReason);
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
rebalanceConfig.groupId,
List.of(new MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
);
future = client.send(coordinator, request).compose(new LeaveGroupResponseHandler(generation));
client.pollNoWakeup();
}
resetGenerationOnLeaveGroup();
return future;
}
We can move the if
condition and onLeavePrepare
into maybeLeaveGroup
, while extracting
the original logic of maybeLeaveGroup
into a new public helper method.
Code that previously invoked maybeLeaveGroup
directly will instead call the new helper method.
WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @ableegoldman that we should check the new option in maybeLeaveGroup
. Additionally, since we haven't discussed it in the KIP, should we skip the callback if the state is REMAIN_IN_GROUP
? I prefer not to modify the callback to minimize changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will initiate a discussion on our KIP thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replied on the KIP discussion thread. After some thought it's actually a bit more complicated than I initially imagined -- so I'd be interested to hear all your thoughts
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CloseOptionInternal.java
Outdated
Show resolved
Hide resolved
Hi @lianetm, |
53ffc57
to
b457537
Compare
* Fluent method to set the group membership operation upon shutdown. | ||
* | ||
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, | ||
* or {@code DEFAULT}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: align start
b457537
to
653ce16
Compare
653ce16
to
bdb5113
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @frankvicky.
I didn't make it as far as the tests, but I left a few comments. Thanks!
...nts/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CloseOptionInternal.java
Outdated
Show resolved
Hide resolved
AbstractMembershipManager<R> membershipManager = membershipManager(); | ||
GroupMembershipOperation leaveGroupOperation = membershipManager.leaveGroupOperation(); | ||
|
||
if (membershipManager.isLeavingGroup() && | ||
// Default operation: both static and dynamic consumers will send a leave heartbeat | ||
(GroupMembershipOperation.DEFAULT == leaveGroupOperation || | ||
// Leave operation: both static and dynamic consumers will send a leave heartbeat | ||
GroupMembershipOperation.LEAVE_GROUP == leaveGroupOperation || | ||
// Remain in group: only static consumers will send a leave heartbeat, while dynamic members will not | ||
membershipManager.groupInstanceId().isPresent()) | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we let this logic live inside the MembershipManager.isLeavingGroup()
implementation rather than exposing it here? The MembershipManager
already manages the group instance ID and GroupMembershipOperation
values internally. Then we wouldn't have to add the other AbstractMembershipManager
changes, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reminder on this question.
bdb5113
to
ab69d48
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky thanks for this patch. a couple of comments are left. PTAL
*/ | ||
void close(final CloseOption option); | ||
|
||
class CloseOption { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can move the close method out of the Consumer
class. Currently, users need to call consumer.close(Consumer.CloseOption.timeout(Duration.xxx))
, which is somewhat verbose. Simplifying this could enhance the user experience. :(
* or {@code DEFAULT}. | ||
* @return a new {@code CloseOption} instance with the specified group membership operation. | ||
*/ | ||
public static CloseOption groupMembershipOperation(final GroupMembershipOperation operation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please file a follow-up to increase the test coverage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @return this {@code CloseOption} instance. | ||
*/ | ||
public CloseOption withGroupMembershipOperation(final GroupMembershipOperation operation) { | ||
this.operation = operation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add null check
*/ | ||
public static CloseOption groupMembershipOperation(final GroupMembershipOperation operation) { | ||
CloseOption option = new CloseOption(); | ||
option.operation = operation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add null check
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
Show resolved
Hide resolved
return groupInstanceId; | ||
@Override | ||
public void leaveGroupOperationOnClose(GroupMembershipOperation operation) { | ||
if (GroupMembershipOperation.DEFAULT.equals(operation)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need those check? we eventually need to handle the DEFAULT
for different cases, right?
// If membershipOperation is DEFAULT, leave group based on rebalanceConfig.leaveGroupOnClose. | ||
// Otherwise, leave group only if membershipOperation is LEAVE_GROUP. | ||
if (GroupMembershipOperation.REMAIN_IN_GROUP != membershipOperation && | ||
(GroupMembershipOperation.LEAVE_GROUP == membershipOperation || rebalanceConfig.leaveGroupOnClose)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @ableegoldman that we should check the new option in maybeLeaveGroup
. Additionally, since we haven't discussed it in the KIP, should we skip the callback if the state is REMAIN_IN_GROUP
? I prefer not to modify the callback to minimize changes
ab69d48
to
8030579
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @frankvicky. Left some more comments.
AbstractMembershipManager<R> membershipManager = membershipManager(); | ||
GroupMembershipOperation leaveGroupOperation = membershipManager.leaveGroupOperation(); | ||
|
||
if (membershipManager.isLeavingGroup() && | ||
// Default operation: both static and dynamic consumers will send a leave heartbeat | ||
(GroupMembershipOperation.DEFAULT == leaveGroupOperation || | ||
// Leave operation: both static and dynamic consumers will send a leave heartbeat | ||
GroupMembershipOperation.LEAVE_GROUP == leaveGroupOperation || | ||
// Remain in group: only static consumers will send a leave heartbeat, while dynamic members will not | ||
membershipManager.groupInstanceId().isPresent()) | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reminder on this question.
...nts/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there!
// If the consumer has dynamic membership, | ||
// we should skip the leaving heartbeat when leaveGroupOperation is REMAIN_IN_GROUP | ||
boolean skipHeartbeatForDynamicMemberRemainInGroup = membershipManager.groupInstanceId().isEmpty() | ||
&& REMAIN_IN_GROUP == membershipManager.leaveGroupOperation(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very minor nit: the double negative with "!skipX" is a bit confusing, instead of "skipHeartbeat..." can we invert the variable to "shouldHeartbeat"?
@@ -182,7 +181,7 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { | |||
} | |||
|
|||
// Case 1: The member is leaving | |||
boolean heartbeatNow = membershipManager().state() == MemberState.LEAVING || | |||
boolean heartbeatNow = isLeavingGroup() || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why add the #isLeavingGroup
method to the AbstractHeartbeatRequestManager instead of just calling membershipManager#isLeavingGroup
here?
Basically I'm wondering why both the HeartbeatRequestManager and MembershipManager classes need to have this #isLeavingGroup
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
membershipManager#isLeavingGroup
considers LEAVING
and PREPARE_LEAVING
to be leaving the group.
But in this flag, we only consider LEAVING
since the flag determines whether the heartbeat should be sent immediately or not. PREPARE_LEAVING
is the state that waits for callbacks to be executed.
The key point is that it's confusing that HeartbeatRequestManager and MembershipManager have a method with the same name. Clearly, HeartbeatRequestManager cares about the timing of a leave request, not the member state.
I will figure out a more appropriate name.
// Determine if we should send a leaving heartbeat: | ||
// - For static membership (when groupInstanceId is present): Always send the leaving heartbeat | ||
// - For dynamic membership: Send the leaving heartbeat only when leaveGroupOperation is not REMAIN_IN_GROUP | ||
boolean shouldHeartbeat = membershipManager.groupInstanceId().isPresent() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC the only difference in this #pollOnClose implementation from the original one in the abstract class is the addition of this shouldHeartbeat
condition to the if membershipManager().isLeavingGroup()
check.
However it seems like the extra condition is built entirely of calls to membershipManager
methods so I'm wondering if it doesn't make more sense to just incorporate the shouldHeartbeat
condition into the #isLeavingGroup
method of the ConsumerMembershipManager class? That way we don't need to make #pollOnClose
abstract and can cut down on some redundant code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also from a quick look at the ConsumerMembershipManager#isLeavingGroup implementation, it already includes checks on both of these conditions (
groupInstanceId.isPresentand
REMAIN_IN_GROUP == membershipManager.leaveGroupOperation()). imo it would be cleaner and easier to read if we just encapsulated all the logic around these conditions inside the
#isLeavingGroup` method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this could significantly reduce redundancy!
Previously, I thought it was more clear that the async consumer and share consumer have their implementation, but it does not provide too much value now.
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH : | ||
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; | ||
boolean isStaticMember = groupInstanceId.isPresent(); | ||
if (REMAIN_IN_GROUP == leaveGroupOperation && isStaticMember) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this condition is basically redundant, right? if isStaticMember
is true then we'll return LEAVE_GROUP_STATIC_MEMBER_EPOCH
anyways via the final check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I should clean this method up after #17614 (comment)
Now, the matrix of membership and epoch is :
REMAIN | LEAVE | DEFAULT | |
---|---|---|---|
dynamic | Don't send | -1 | -1 |
static | -2 | -1 | -2 |
Given that, we could remove if (REMAIN_IN_GROUP == leaveGroupOperation && isStaticMember)
condition.
if (LEAVE_GROUP == leaveGroupOperation) { | ||
return ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand why this condition is needed but it would be good to leave a comment explaining this change because it's pretty obscure. IIUC basically the problem is that there's no way to permanently leave the group with a static consumer right now, and the only method of removing static members is via an admin API. So we essentially have to "trick" the GroupMetadataManager into fencing this member to kick it out of the group, because if we use the LEAVE_GROUP_STATIC_MEMBER_EPOCH
then it thinks it's just being "temporarily" removed and this won't actually result in the consumer leaving the group. So we use LEAVE_GROUP_MEMBER_EPOCH
because anything that isn't the LEAVE_GROUP_STATIC_MEMBER_EPOCH
triggers the GroupMetadataManager to fence the consumer in its #consumerGroupLeave
method.
First off does that summary sound right? It's also pretty hacky, and while I think it's fine to proceed with this for now, I'm wondering if we shouldn't just try to implement proper LeaveGroup mechanics for static membership. I know this would require some server-side changes but maybe this can be a followup KIP? Again, no need to block this PR on that, just putting it out there. Feel free to pick that up yourself or file a ticket if that makes sense to you too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your summary is entirely correct. I will add a comment in the next commit.
I know this would require some server-side changes but maybe this can be a followup KIP? Again, no need to block this PR on that, just putting it out there. Feel free to pick that up yourself or file a ticket if that makes sense to you too.
I will file a ticket to trace this one and see if we can gather some feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, I think we're finally good to go with this! Thanks for picking this up and for being patient, this turned out to be more complex than it seemed and there were a lot of details to work out 🙂
Will merge to trunk
Merged to trunk (🥳 ) -- do you want to start working on the followup PR now to utilize this new API for |
Hi @ableegoldman
Yes, I will start to tackle it soon 🚀 Thanks again 😸 |
Great to see this merged. I took the liberty to do small follow up PR to improve the JavaDocs: #19546 |
JIRA: KAFKA-16758
This PR is aim to deliver
KIP-1092,
please refer to KIP-1092 and KAFKA-16758 for further details.
Reviewers: Anna Sophie Blee-Goldman [email protected],
Chia-Ping Tsai [email protected], Kirk True [email protected]