Skip to content

Commit 74acbd2

Browse files
authored
KAFKA-16758: Extend Consumer#close with an option to leave the group or not (#17614)
JIRA: [KAFKA-16758](https://issues.apache.org/jira/browse/KAFKA-16758) This PR is aim to deliver [KIP-1092](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=321719077), please refer to KIP-1092 and KAFKA-16758 for further details. Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Chia-Ping Tsai <[email protected]>, Kirk True <[email protected]>
1 parent b963e58 commit 74acbd2

23 files changed

+479
-78
lines changed

checkstyle/suppressions.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@
129129
files="(OffsetFetcher|RequestResponse)Test.java"/>
130130

131131
<suppress checks="JavaNCSS"
132-
files="RequestResponseTest.java|FetcherTest.java|FetchRequestManagerTest.java|KafkaAdminClientTest.java"/>
132+
files="RequestResponseTest.java|FetcherTest.java|FetchRequestManagerTest.java|KafkaAdminClientTest.java|ConsumerMembershipManagerTest.java"/>
133133

134134
<suppress checks="NPathComplexity"
135135
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer;
18+
19+
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
20+
21+
import java.time.Duration;
22+
import java.util.Objects;
23+
import java.util.Optional;
24+
25+
public class CloseOptions {
26+
/**
27+
* Enum to specify the group membership operation upon leaving group.
28+
*
29+
* <ul>
30+
* <li><b>{@code LEAVE_GROUP}</b>: means the consumer will leave the group.</li>
31+
* <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in the group.</li>
32+
* <li><b>{@code DEFAULT}</b>: Applies the default behavior:
33+
* <ul>
34+
* <li>For <b>static members</b>: The consumer will remain in the group.</li>
35+
* <li>For <b>dynamic members</b>: The consumer will leave the group.</li>
36+
* </ul>
37+
* </li>
38+
* </ul>
39+
*/
40+
public enum GroupMembershipOperation {
41+
LEAVE_GROUP,
42+
REMAIN_IN_GROUP,
43+
DEFAULT
44+
}
45+
46+
/**
47+
* Specifies the group membership operation upon shutdown.
48+
* By default, {@code GroupMembershipOperation.DEFAULT} will be applied, which follows the consumer's default behavior.
49+
*/
50+
protected GroupMembershipOperation operation = GroupMembershipOperation.DEFAULT;
51+
52+
/**
53+
* Specifies the maximum amount of time to wait for the close process to complete.
54+
* This allows users to define a custom timeout for gracefully stopping the consumer.
55+
* If no value is set, the default timeout {@link ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
56+
*/
57+
protected Optional<Duration> timeout = Optional.empty();
58+
59+
private CloseOptions() {
60+
}
61+
62+
protected CloseOptions(final CloseOptions option) {
63+
this.operation = option.operation;
64+
this.timeout = option.timeout;
65+
}
66+
67+
/**
68+
* Static method to create a {@code CloseOptions} with a custom timeout.
69+
*
70+
* @param timeout the maximum time to wait for the consumer to close.
71+
* @return a new {@code CloseOptions} instance with the specified timeout.
72+
*/
73+
public static CloseOptions timeout(final Duration timeout) {
74+
CloseOptions option = new CloseOptions();
75+
option.timeout = Optional.ofNullable(timeout);
76+
return option;
77+
}
78+
79+
/**
80+
* Static method to create a {@code CloseOptions} with a specified group membership operation.
81+
*
82+
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
83+
* or {@code DEFAULT}.
84+
* @return a new {@code CloseOptions} instance with the specified group membership operation.
85+
*/
86+
public static CloseOptions groupMembershipOperation(final GroupMembershipOperation operation) {
87+
Objects.requireNonNull(operation, "operation should not be null");
88+
CloseOptions option = new CloseOptions();
89+
option.operation = operation;
90+
return option;
91+
}
92+
93+
/**
94+
* Fluent method to set the timeout for the close process.
95+
*
96+
* @param timeout the maximum time to wait for the consumer to close. If {@code null}, the default timeout will be used.
97+
* @return this {@code CloseOptions} instance.
98+
*/
99+
public CloseOptions withTimeout(final Duration timeout) {
100+
this.timeout = Optional.ofNullable(timeout);
101+
return this;
102+
}
103+
104+
/**
105+
* Fluent method to set the group membership operation upon shutdown.
106+
*
107+
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
108+
* @return this {@code CloseOptions} instance.
109+
*/
110+
public CloseOptions withGroupMembershipOperation(final GroupMembershipOperation operation) {
111+
Objects.requireNonNull(operation, "operation should not be null");
112+
this.operation = operation;
113+
return this;
114+
}
115+
116+
public GroupMembershipOperation groupMembershipOperation() {
117+
return operation;
118+
}
119+
120+
public Optional<Duration> timeout() {
121+
return timeout;
122+
}
123+
124+
}

clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java

+7
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,20 @@ public interface Consumer<K, V> extends Closeable {
277277
void close();
278278

279279
/**
280+
* This method has been deprecated since Kafka 4.0 and should use {@link Consumer#close(CloseOptions)} instead.
281+
*
280282
* @see KafkaConsumer#close(Duration)
281283
*/
284+
@Deprecated
282285
void close(Duration timeout);
283286

284287
/**
285288
* @see KafkaConsumer#wakeup()
286289
*/
287290
void wakeup();
288291

292+
/**
293+
* @see KafkaConsumer#close(CloseOptions)
294+
*/
295+
void close(final CloseOptions option);
289296
}

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

+12
Original file line numberDiff line numberDiff line change
@@ -1798,6 +1798,7 @@ public void close() {
17981798
* @throws org.apache.kafka.common.KafkaException for any other error during close
17991799
*/
18001800
@Override
1801+
@SuppressWarnings("deprecation")
18011802
public void close(Duration timeout) {
18021803
delegate.close(timeout);
18031804
}
@@ -1812,6 +1813,17 @@ public void wakeup() {
18121813
delegate.wakeup();
18131814
}
18141815

1816+
/**
1817+
* This method allows the caller to specify shutdown behavior using the {@link CloseOptions} class.
1818+
* If {@code null} is provided, the default behavior will be applied, equivalent to providing a new {@link CloseOptions} instance.
1819+
*
1820+
* @param option see {@link CloseOptions}
1821+
*/
1822+
@Override
1823+
public void close(CloseOptions option) {
1824+
delegate.close(option);
1825+
}
1826+
18151827
// Functions below are for testing only
18161828
String clientId() {
18171829
return delegate.clientId();

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

+6
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,7 @@ public void close() {
549549
}
550550

551551
@Override
552+
@SuppressWarnings("deprecation")
552553
public synchronized void close(Duration timeout) {
553554
this.closed = true;
554555
}
@@ -562,6 +563,11 @@ public synchronized void wakeup() {
562563
wakeup.set(true);
563564
}
564565

566+
@Override
567+
public void close(CloseOptions option) {
568+
this.closed = true;
569+
}
570+
565571
/**
566572
* Schedule a task to be executed during a poll(). One enqueued task will be executed per {@link #poll(Duration)}
567573
* invocation. You can use this repeatedly to mock out multiple responses to poll invocations.

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

+23-17
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.clients.ClientResponse;
2020
import org.apache.kafka.clients.CommonClientConfigs;
2121
import org.apache.kafka.clients.GroupRebalanceConfig;
22+
import org.apache.kafka.clients.consumer.CloseOptions;
2223
import org.apache.kafka.common.KafkaException;
2324
import org.apache.kafka.common.Node;
2425
import org.apache.kafka.common.errors.AuthenticationException;
@@ -85,6 +86,9 @@
8586
import java.util.concurrent.TimeUnit;
8687
import java.util.function.Supplier;
8788

89+
import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
90+
import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
91+
8892
/**
8993
* AbstractCoordinator implements group management for a single group member by interacting with
9094
* a designated Kafka broker (the coordinator). Group semantics are provided by extending this class.
@@ -1116,23 +1120,21 @@ private boolean isProtocolTypeInconsistent(String protocolType) {
11161120
*/
11171121
@Override
11181122
public final void close() {
1119-
close(time.timer(0));
1123+
close(time.timer(0), DEFAULT);
11201124
}
11211125

11221126
/**
11231127
* @throws KafkaException if the rebalance callback throws exception
11241128
*/
1125-
protected void close(Timer timer) {
1129+
protected void close(Timer timer, CloseOptions.GroupMembershipOperation membershipOperation) {
11261130
try {
11271131
closeHeartbeatThread();
11281132
} finally {
11291133
// Synchronize after closing the heartbeat thread since heartbeat thread
11301134
// needs this lock to complete and terminate after close flag is set.
11311135
synchronized (this) {
1132-
if (rebalanceConfig.leaveGroupOnClose) {
1133-
onLeavePrepare();
1134-
maybeLeaveGroup("the consumer is being closed");
1135-
}
1136+
onLeavePrepare();
1137+
maybeLeaveGroup(membershipOperation, "the consumer is being closed");
11361138

11371139
// At this point, there may be pending commits (async commits or sync commits that were
11381140
// interrupted using wakeup) and the leave group request which have been queued, but not
@@ -1153,26 +1155,22 @@ protected void handlePollTimeoutExpiry() {
11531155
"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " +
11541156
"returned in poll() with max.poll.records.");
11551157

1156-
maybeLeaveGroup("consumer poll timeout has expired.");
1158+
maybeLeaveGroup(DEFAULT, "consumer poll timeout has expired.");
11571159
}
11581160

11591161
/**
1160-
* Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership or is already
1161-
* not part of the group (ie does not have a valid member id, is in the UNJOINED state, or the coordinator is unknown).
1162+
* Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership
1163+
* with the default consumer group membership operation, or is already not part of the group (i.e., does not have a
1164+
* valid member ID, is in the UNJOINED state, or the coordinator is unknown).
11621165
*
1166+
* @param membershipOperation the operation on consumer group membership that the consumer will perform when closing
11631167
* @param leaveReason the reason to leave the group for logging
11641168
* @throws KafkaException if the rebalance callback throws exception
11651169
*/
1166-
public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
1170+
public synchronized RequestFuture<Void> maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, String leaveReason) {
11671171
RequestFuture<Void> future = null;
11681172

1169-
// Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker,
1170-
// consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup,
1171-
// and the membership expiration is only controlled by session timeout.
1172-
if (isDynamicMember() && !coordinatorUnknown() &&
1173-
state != MemberState.UNJOINED && generation.hasMemberId()) {
1174-
// this is a minimal effort attempt to leave the group. we do not
1175-
// attempt any resending if the request fails or times out.
1173+
if (rebalanceConfig.leaveGroupOnClose && shouldSendLeaveGroupRequest(membershipOperation)) {
11761174
log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
11771175
generation.memberId, coordinator, leaveReason);
11781176
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
@@ -1189,6 +1187,14 @@ public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
11891187
return future;
11901188
}
11911189

1190+
private boolean shouldSendLeaveGroupRequest(CloseOptions.GroupMembershipOperation membershipOperation) {
1191+
if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation.hasMemberId()) {
1192+
return membershipOperation == LEAVE_GROUP || (isDynamicMember() && membershipOperation == DEFAULT);
1193+
} else {
1194+
return false;
1195+
}
1196+
}
1197+
11921198
protected boolean isDynamicMember() {
11931199
return rebalanceConfig.groupInstanceId.isEmpty();
11941200
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,9 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
181181
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(leaveHeartbeat));
182182
}
183183

184-
// Case 1: The member is leaving
185-
boolean heartbeatNow = membershipManager().state() == MemberState.LEAVING ||
184+
// Case 1: The member state is LEAVING - if the member is a share consumer, we should immediately send leave;
185+
// if the member is an async consumer, this will also depend on leavingGroupOperation.
186+
boolean heartbeatNow = shouldSendLeaveHeartbeatNow() ||
186187
// Case 2: The member state indicates it should send a heartbeat without waiting for the interval,
187188
// and there is no heartbeat request currently in-flight
188189
(membershipManager().shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight());
@@ -201,6 +202,11 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
201202
*/
202203
public abstract AbstractMembershipManager<R> membershipManager();
203204

205+
/**
206+
* @return the member should send leave heartbeat immediately or not
207+
*/
208+
protected abstract boolean shouldSendLeaveHeartbeatNow();
209+
204210
/**
205211
* Generate a heartbeat request to leave the group if the state is still LEAVING when this is
206212
* called to close the consumer.

0 commit comments

Comments
 (0)