Skip to content

KAFKA-16758: Extend Consumer#close with an option to leave the group or not #17614

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Apr 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
39a7004
KAFKA-16758: Extend Consumer#close with an option to leave the group …
frankvicky Oct 26, 2024
837d846
Merge branch 'trunk' into KAFKA-16758
frankvicky Feb 27, 2025
9906e28
Address by comments
frankvicky Feb 27, 2025
b0bce75
revert unnecessary changes
frankvicky Feb 27, 2025
9c7d269
refactor by parameterizedTest
frankvicky Feb 27, 2025
1941081
Merge branch 'trunk' into KAFKA-16758
frankvicky Feb 27, 2025
c03c5f3
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 2, 2025
840395f
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 4, 2025
c18babb
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 6, 2025
a26310d
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 7, 2025
696d207
skip close heartbeat for dynamic async consumer
frankvicky Mar 7, 2025
87be03e
adjust leave group epoch
frankvicky Mar 7, 2025
72f5473
fix failed tests and revert unnecessary changes
frankvicky Mar 8, 2025
df7afca
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 8, 2025
f20f6d7
Adjust test and pollOnClose flow
frankvicky Mar 8, 2025
7da92e4
adjust pollOnClose logic
frankvicky Mar 8, 2025
f498a8e
rename variable
frankvicky Mar 8, 2025
e96c5f5
rearrange method
frankvicky Mar 8, 2025
22138d3
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 11, 2025
9040cd2
fix conflicts
frankvicky Mar 11, 2025
06b51a8
apply spotless
frankvicky Mar 11, 2025
26a62bd
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 12, 2025
68db56c
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 13, 2025
3e5e288
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 15, 2025
7513446
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 16, 2025
3d4ed4a
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 17, 2025
9d45444
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 20, 2025
1e18a34
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 21, 2025
204cb83
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 22, 2025
fb4055b
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 23, 2025
e5c9fa5
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 24, 2025
ab89078
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 25, 2025
61a6346
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 26, 2025
fbc878a
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 27, 2025
6143d52
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 29, 2025
17d3ee2
Merge branch 'trunk' into KAFKA-16758
frankvicky Mar 30, 2025
3bf1e26
Merge branch 'trunk' into KAFKA-16758
frankvicky Apr 1, 2025
11fb74f
Merge branch 'trunk' into KAFKA-16758
frankvicky Apr 4, 2025
076ea2a
Merge branch 'trunk' into KAFKA-16758
frankvicky Apr 5, 2025
e946499
Address comments
frankvicky Apr 5, 2025
8ee016d
Address comments
frankvicky Apr 5, 2025
7fc4237
change scope to private
frankvicky Apr 5, 2025
f37d223
Fix failed tests
frankvicky Apr 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
files="(OffsetFetcher|RequestResponse)Test.java"/>

<suppress checks="JavaNCSS"
files="RequestResponseTest.java|FetcherTest.java|FetchRequestManagerTest.java|KafkaAdminClientTest.java"/>
files="RequestResponseTest.java|FetcherTest.java|FetchRequestManagerTest.java|KafkaAdminClientTest.java|ConsumerMembershipManagerTest.java"/>

<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.consumer.internals.ConsumerUtils;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;

public class CloseOptions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update KIP for this naming change.

/**
* Enum to specify the group membership operation upon leaving group.
*
* <ul>
* <li><b>{@code LEAVE_GROUP}</b>: means the consumer will leave the group.</li>
* <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in the group.</li>
* <li><b>{@code DEFAULT}</b>: Applies the default behavior:
* <ul>
* <li>For <b>static members</b>: The consumer will remain in the group.</li>
* <li>For <b>dynamic members</b>: The consumer will leave the group.</li>
* </ul>
* </li>
* </ul>
*/
public enum GroupMembershipOperation {
LEAVE_GROUP,
REMAIN_IN_GROUP,
DEFAULT
}

/**
* Specifies the group membership operation upon shutdown.
* By default, {@code GroupMembershipOperation.DEFAULT} will be applied, which follows the consumer's default behavior.
*/
protected GroupMembershipOperation operation = GroupMembershipOperation.DEFAULT;

/**
* Specifies the maximum amount of time to wait for the close process to complete.
* This allows users to define a custom timeout for gracefully stopping the consumer.
* If no value is set, the default timeout {@link ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
*/
protected Optional<Duration> timeout = Optional.empty();

private CloseOptions() {
}

protected CloseOptions(final CloseOptions option) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse me, why to have protected constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we had an internal class, but we deleted it in development.
I will file a patch to remove this constructor.

this.operation = option.operation;
this.timeout = option.timeout;
}

/**
* Static method to create a {@code CloseOptions} with a custom timeout.
*
* @param timeout the maximum time to wait for the consumer to close.
* @return a new {@code CloseOptions} instance with the specified timeout.
*/
public static CloseOptions timeout(final Duration timeout) {
CloseOptions option = new CloseOptions();
option.timeout = Optional.ofNullable(timeout);
return option;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return new CloseOptions().withTimeout(timeout);

}

/**
* Static method to create a {@code CloseOptions} with a specified group membership operation.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
* or {@code DEFAULT}.
* @return a new {@code CloseOptions} instance with the specified group membership operation.
*/
public static CloseOptions groupMembershipOperation(final GroupMembershipOperation operation) {
Objects.requireNonNull(operation, "operation should not be null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please consider using return new CloseOptions().withGroupMembershipOperation(operation); to avoid duplicate code?

CloseOptions option = new CloseOptions();
option.operation = operation;
return option;
}

/**
* Fluent method to set the timeout for the close process.
*
* @param timeout the maximum time to wait for the consumer to close. If {@code null}, the default timeout will be used.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withTimeout(final Duration timeout) {
this.timeout = Optional.ofNullable(timeout);
return this;
}

/**
* Fluent method to set the group membership operation upon shutdown.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withGroupMembershipOperation(final GroupMembershipOperation operation) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add unit test for those helpers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tracked by https://issues.apache.org/jira/browse/KAFKA-18267
I will file a patch for it soon.

Objects.requireNonNull(operation, "operation should not be null");
this.operation = operation;
return this;
}

public GroupMembershipOperation groupMembershipOperation() {
return operation;
}

public Optional<Duration> timeout() {
return timeout;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,20 @@ public interface Consumer<K, V> extends Closeable {
void close();

/**
* This method has been deprecated since Kafka 4.0 and should use {@link Consumer#close(CloseOptions)} instead.
*
* @see KafkaConsumer#close(Duration)
*/
@Deprecated
void close(Duration timeout);

/**
* @see KafkaConsumer#wakeup()
*/
void wakeup();

/**
* @see KafkaConsumer#close(CloseOptions)
*/
void close(final CloseOptions option);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,7 @@ public void close() {
* @throws org.apache.kafka.common.KafkaException for any other error during close
*/
@Override
@SuppressWarnings("deprecation")
public void close(Duration timeout) {
delegate.close(timeout);
}
Expand All @@ -1812,6 +1813,17 @@ public void wakeup() {
delegate.wakeup();
}

/**
* This method allows the caller to specify shutdown behavior using the {@link CloseOptions} class.
* If {@code null} is provided, the default behavior will be applied, equivalent to providing a new {@link CloseOptions} instance.
*
* @param option see {@link CloseOptions}
*/
@Override
public void close(CloseOptions option) {
delegate.close(option);
}

// Functions below are for testing only
String clientId() {
return delegate.clientId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ public void close() {
}

@Override
@SuppressWarnings("deprecation")
public synchronized void close(Duration timeout) {
this.closed = true;
}
Expand All @@ -562,6 +563,11 @@ public synchronized void wakeup() {
wakeup.set(true);
}

@Override
public void close(CloseOptions option) {
this.closed = true;
}

/**
* Schedule a task to be executed during a poll(). One enqueued task will be executed per {@link #poll(Duration)}
* invocation. You can use this repeatedly to mock out multiple responses to poll invocations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
Expand Down Expand Up @@ -85,6 +86,9 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;

/**
* AbstractCoordinator implements group management for a single group member by interacting with
* a designated Kafka broker (the coordinator). Group semantics are provided by extending this class.
Expand Down Expand Up @@ -1116,23 +1120,21 @@ private boolean isProtocolTypeInconsistent(String protocolType) {
*/
@Override
public final void close() {
close(time.timer(0));
close(time.timer(0), DEFAULT);
}

/**
* @throws KafkaException if the rebalance callback throws exception
*/
protected void close(Timer timer) {
protected void close(Timer timer, CloseOptions.GroupMembershipOperation membershipOperation) {
try {
closeHeartbeatThread();
} finally {
// Synchronize after closing the heartbeat thread since heartbeat thread
// needs this lock to complete and terminate after close flag is set.
synchronized (this) {
if (rebalanceConfig.leaveGroupOnClose) {
onLeavePrepare();
maybeLeaveGroup("the consumer is being closed");
}
onLeavePrepare();
maybeLeaveGroup(membershipOperation, "the consumer is being closed");

// At this point, there may be pending commits (async commits or sync commits that were
// interrupted using wakeup) and the leave group request which have been queued, but not
Expand All @@ -1153,26 +1155,22 @@ protected void handlePollTimeoutExpiry() {
"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " +
"returned in poll() with max.poll.records.");

maybeLeaveGroup("consumer poll timeout has expired.");
maybeLeaveGroup(DEFAULT, "consumer poll timeout has expired.");
}

/**
* Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership or is already
* not part of the group (ie does not have a valid member id, is in the UNJOINED state, or the coordinator is unknown).
* Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership
* with the default consumer group membership operation, or is already not part of the group (i.e., does not have a
* valid member ID, is in the UNJOINED state, or the coordinator is unknown).
*
* @param membershipOperation the operation on consumer group membership that the consumer will perform when closing
* @param leaveReason the reason to leave the group for logging
* @throws KafkaException if the rebalance callback throws exception
*/
public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
public synchronized RequestFuture<Void> maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, String leaveReason) {
RequestFuture<Void> future = null;

// Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker,
// consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup,
// and the membership expiration is only controlled by session timeout.
if (isDynamicMember() && !coordinatorUnknown() &&
state != MemberState.UNJOINED && generation.hasMemberId()) {
// this is a minimal effort attempt to leave the group. we do not
// attempt any resending if the request fails or times out.
if (rebalanceConfig.leaveGroupOnClose && shouldSendLeaveGroupRequest(membershipOperation)) {
log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
generation.memberId, coordinator, leaveReason);
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
Expand All @@ -1189,6 +1187,14 @@ public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
return future;
}

private boolean shouldSendLeaveGroupRequest(CloseOptions.GroupMembershipOperation membershipOperation) {
if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation.hasMemberId()) {
return membershipOperation == LEAVE_GROUP || (isDynamicMember() && membershipOperation == DEFAULT);
} else {
return false;
}
}

protected boolean isDynamicMember() {
return rebalanceConfig.groupInstanceId.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,9 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(leaveHeartbeat));
}

// Case 1: The member is leaving
boolean heartbeatNow = membershipManager().state() == MemberState.LEAVING ||
// Case 1: The member state is LEAVING - if the member is a share consumer, we should immediately send leave;
// if the member is an async consumer, this will also depend on leavingGroupOperation.
boolean heartbeatNow = shouldSendLeaveHeartbeatNow() ||
// Case 2: The member state indicates it should send a heartbeat without waiting for the interval,
// and there is no heartbeat request currently in-flight
(membershipManager().shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight());
Expand All @@ -201,6 +202,11 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
*/
public abstract AbstractMembershipManager<R> membershipManager();

/**
* @return the member should send leave heartbeat immediately or not
*/
protected abstract boolean shouldSendLeaveHeartbeatNow();

/**
* Generate a heartbeat request to leave the group if the state is still LEAVING when this is
* called to close the consumer.
Expand Down
Loading