-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-18185: remove internal.leave.group.on.close config #19400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
+ " for the following reason: ", | ||
exception.getCause() | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, this chunk of code could be replaced with the new close api.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! love to see all this code get deleted for one line 😄
@@ -875,7 +878,7 @@ public void run() { | |||
streamsUncaughtExceptionHandler.accept(e, false); | |||
// Note: the above call currently rethrows the exception, so nothing below this line will be executed | |||
} finally { | |||
completeShutdown(cleanRun); | |||
completeShutdown(cleanRun, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we close the StreamThread
, we should ensure consumers leave their group.
Does it make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah sorry i totally missed this question on my first pass 🤦♀️ No, we actually do want to stay in the group and retain the existing default behavior for Streams
if you're curious why the default behavior for Streams is the opposite of the plain consumer client, it's because Streams is stateful which means rebalancing and moving partitions can be a much heavier operation and involve restoring many records from the changelog topics when partitions are shuffled around. By skipping the leave group, we can often avoid an unnecessary extra rebalance that would result in partitions being moved, since the assignment of Streams is heavily dependent on the current group members. If an error gets thrown and the StreamThread dies and gets restarted within the session timeout, we can avoid any rebalances where the dead thread is missing, so the rebalance when the new thread restarts should result in the same assignment of partitions to threads as we had before the thread died.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation!
final boolean cleanResult = tryCleanGlobal(false, null, null, appID); | ||
assertFalse(cleanResult); | ||
assertTrue(cleanResult); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding #19400 (comment)
This assertion is no longer false.
Make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not 100% sure i follow, why does this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In https://github.com/apache/kafka/pull/19400/files#r2036678414
We have passed leaveGroup=true
to the close method, which will make all consumers leave the group.
To keep the original behavior, we should pass leaveGroup=false
to the close method.
I think we should maintain the original behavior for now; I will change it to false in the next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh i see, I missed that we were passing leaveGroup=true
to the consumer before. Glad this test caught that, though maybe we should try to have a test specifically to make sure the default leave group behavior isn't ever changed by accident in the future?
@@ -1640,33 +1599,6 @@ public synchronized boolean close(final CloseOptions options) throws IllegalArgu | |||
return close(Optional.of(timeoutMs), options.leaveGroup); | |||
} | |||
|
|||
private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ableegoldman
Do you think we should keep this method?
Before KIP-1092, the consumer relied on admin#removeMembersFromConsumerGroup
to leave the group.
As #19400 (comment) states, if we want to pass leaveGroup=false
and maintain the default behavior, I think we should retain this method. Once a consumer is shut down and does not leave the group, they must rely on the admin to remove them from the consumer group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also the reason that KafkaStreamsCloseOptionsIntegrationTest is failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is only ever called from KafkaStreams#close
, there's no separate method KafkaStreams method that can be used to remove consumer from the group via the admin client (side note: maybe there should be?). There's a cli tool for static member removal so users who don't set leaveGroup=true
in the CloseOptions can do so through this.
So the new consumer#close(CloseOptions.groupMembershipOperation(LEAVE_GROUP))
is essentially replacing this method entirely so in theory we should be able to remove it. That said, it's definitely concerning if KafkaStreamsCloseOptionsIntegrationTest is failing because we removed this. That suggests maybe our hack/workaround for removing static members in consumer#close
via intentionally getting fenced doesn't work?
If that's the case then we should go back to using the admin's #removeMembersFromConsumerGroup
for Streams for now, and can follow up with that KIP to do a proper fix for allowing static members to permanently leave via LeaveGroup.
Hi @ableegoldman |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the followup!
+ " for the following reason: ", | ||
exception.getCause() | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! love to see all this code get deleted for one line 😄
} catch (final Throwable e) { | ||
log.error("Failed to close consumer due to the following error:", e); | ||
} | ||
try { | ||
restoreConsumer.close(); | ||
restoreConsumer.close(closeOptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the restore consumer direct assignment so it's not part of the consumer group and doesn't need the close options (I don't remember, what happens if you pass in LEAVE_GROUP or REMAIN_IN_GROUP for a consumer that doesn't use group subscription?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the consumer doesn't subscribe, it will not join a group.
In this case, it will get an unknown member ID in the response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should pass either DEFAULT or REMAIN_IN_GROUP in for the restore consumer so it doesn't try to leave the group. If I'm remembering correctly, when LEAVE_GROUP is used it always tries to send a leave group even if it's not in a group, whereas with DEFAULT it will skip the leaveGroup if there's no member id or the coordinator is unknown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm actually how could it even send a LeaveGroup if LEAVE_GROUP is selected for a consumer that isn't part of a group? That is, by definition the coordinator is unknown, so where would the request even be sent?
Don't remember of the top of my head how we handled this in the consumer client CloseOptions PR but it seems like the consumer should be agnostic to the GroupMembershipOperation and never send anything if it's not part of a group to begin with, right?
@@ -1994,6 +2005,13 @@ public Map<String, KafkaFuture<Uuid>> clientInstanceIds(final Duration timeout) | |||
return result; | |||
} | |||
|
|||
public void closeConsumer(final long timeoutMs, final boolean leaveGroup) { | |||
final CloseOptions.GroupMembershipOperation operation = leaveGroup ? LEAVE_GROUP : REMAIN_IN_GROUP; | |||
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(timeoutMs <= 0 ? 0 : timeoutMs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's definitely a bit questionable that we don't pass the timeout in to consumer#close in the current code (interestingly we also don't for producer#close but do use the timeout for admin#close) But, I don't want to change that in this PR, on the off chance there was an actual reason for it
I know we had some issues with the producer timeout on close in the past but I don't remember the specifics and I'm not personally aware of any similar issues for the consumer's close. It's worth following up on, but separately from this. Let's just use the default for now (ie don't set the timeout at all)
final boolean cleanResult = tryCleanGlobal(false, null, null, appID); | ||
assertFalse(cleanResult); | ||
assertTrue(cleanResult); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not 100% sure i follow, why does this change?
tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
Outdated
Show resolved
Hide resolved
looks like KafkaStreamsCloseOptionsIntegrationTest is failing |
@frankvicky this isn't strictly related to this PR but there's a KIP (1153) out to fix up the API for the Streams version of CloseOptions, would be great if you could take a look and give a +1 once it moves to voting |
JIRA: KAFKA-18185
This is a follow-up of #17614 The patch is to remove the
internal.leave.group.on.close
config.