Skip to content

Commit ab69d48

Browse files
committed
KAFKA-16758: Extend Consumer#close with an option to leave the group or not
1 parent 220c578 commit ab69d48

25 files changed

+525
-71
lines changed

checkstyle/suppressions.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@
129129
files="(OffsetFetcher|RequestResponse)Test.java"/>
130130

131131
<suppress checks="JavaNCSS"
132-
files="RequestResponseTest.java|FetcherTest.java|FetchRequestManagerTest.java|KafkaAdminClientTest.java"/>
132+
files="RequestResponseTest.java|FetcherTest.java|FetchRequestManagerTest.java|KafkaAdminClientTest.java|ConsumerMembershipManagerTest.java"/>
133133

134134
<suppress checks="NPathComplexity"
135135
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>

clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java

+80
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.clients.consumer;
1818

19+
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
1920
import org.apache.kafka.common.Metric;
2021
import org.apache.kafka.common.MetricName;
2122
import org.apache.kafka.common.PartitionInfo;
@@ -28,6 +29,7 @@
2829
import java.util.Collection;
2930
import java.util.List;
3031
import java.util.Map;
32+
import java.util.Optional;
3133
import java.util.OptionalLong;
3234
import java.util.Set;
3335
import java.util.regex.Pattern;
@@ -277,13 +279,91 @@ public interface Consumer<K, V> extends Closeable {
277279
void close();
278280

279281
/**
282+
* This method has been deprecated since Kafka 4.0 and should use {@link Consumer#close(CloseOption)} instead.
283+
*
280284
* @see KafkaConsumer#close(Duration)
281285
*/
286+
@Deprecated
282287
void close(Duration timeout);
283288

284289
/**
285290
* @see KafkaConsumer#wakeup()
286291
*/
287292
void wakeup();
288293

294+
/**
295+
* @see KafkaConsumer#close(CloseOption)
296+
*/
297+
void close(final CloseOption option);
298+
299+
class CloseOption {
300+
301+
/**
302+
* Specifies the group membership operation upon shutdown.
303+
* By default, {@code GroupMembershipOperation.DEFAULT} will be applied, which follows the consumer's default behavior.
304+
*/
305+
protected GroupMembershipOperation operation = GroupMembershipOperation.DEFAULT;
306+
307+
/**
308+
* Specifies the maximum amount of time to wait for the close process to complete.
309+
* This allows users to define a custom timeout for gracefully stopping the consumer.
310+
* If no value is set, the default timeout {@link ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
311+
*/
312+
protected Optional<Duration> timeout = Optional.empty();
313+
314+
private CloseOption() {
315+
}
316+
317+
protected CloseOption(final CloseOption option) {
318+
this.operation = option.operation;
319+
this.timeout = option.timeout;
320+
}
321+
322+
/**
323+
* Static method to create a {@code CloseOption} with a custom timeout.
324+
*
325+
* @param timeout the maximum time to wait for the consumer to close.
326+
* @return a new {@code CloseOption} instance with the specified timeout.
327+
*/
328+
public static CloseOption timeout(final Duration timeout) {
329+
CloseOption option = new CloseOption();
330+
option.timeout = Optional.ofNullable(timeout);
331+
return option;
332+
}
333+
334+
/**
335+
* Static method to create a {@code CloseOption} with a specified group membership operation.
336+
*
337+
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
338+
* or {@code DEFAULT}.
339+
* @return a new {@code CloseOption} instance with the specified group membership operation.
340+
*/
341+
public static CloseOption groupMembershipOperation(final GroupMembershipOperation operation) {
342+
CloseOption option = new CloseOption();
343+
option.operation = operation;
344+
return option;
345+
}
346+
347+
/**
348+
* Fluent method to set the timeout for the close process.
349+
*
350+
* @param timeout the maximum time to wait for the consumer to close. If {@code null}, the default timeout will be used.
351+
* @return this {@code CloseOption} instance.
352+
*/
353+
public CloseOption withTimeout(final Duration timeout) {
354+
this.timeout = Optional.ofNullable(timeout);
355+
return this;
356+
}
357+
358+
/**
359+
* Fluent method to set the group membership operation upon shutdown.
360+
*
361+
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
362+
* @return this {@code CloseOption} instance.
363+
*/
364+
public CloseOption withGroupMembershipOperation(final GroupMembershipOperation operation) {
365+
this.operation = operation;
366+
return this;
367+
}
368+
}
289369
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer;
18+
19+
/**
20+
* Enum to specify the group membership operation upon leaving group.
21+
*
22+
* <ul>
23+
* <li><b>{@code LEAVE_GROUP}</b>: means the consumer will leave the group.</li>
24+
* <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in the group.</li>
25+
* <li><b>{@code DEFAULT}</b>: Applies the default behavior:
26+
* <ul>
27+
* <li>For <b>static members</b>: The consumer will remain in the group.</li>
28+
* <li>For <b>dynamic members</b>: The consumer will leave the group.</li>
29+
* </ul>
30+
* </li>
31+
* </ul>
32+
*/
33+
public enum GroupMembershipOperation {
34+
LEAVE_GROUP,
35+
REMAIN_IN_GROUP,
36+
DEFAULT
37+
}

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

+12
Original file line numberDiff line numberDiff line change
@@ -1776,6 +1776,7 @@ public void close() {
17761776
* @throws org.apache.kafka.common.KafkaException for any other error during close
17771777
*/
17781778
@Override
1779+
@SuppressWarnings("deprecation")
17791780
public void close(Duration timeout) {
17801781
delegate.close(timeout);
17811782
}
@@ -1790,6 +1791,17 @@ public void wakeup() {
17901791
delegate.wakeup();
17911792
}
17921793

1794+
/**
1795+
* This method allows the caller to specify shutdown behavior using the {@link CloseOption} class.
1796+
* If {@code null} is provided, the default behavior will be applied, equivalent to providing a new {@link CloseOption} instance.
1797+
*
1798+
* @param option see {@link CloseOption}
1799+
*/
1800+
@Override
1801+
public void close(CloseOption option) {
1802+
delegate.close(option);
1803+
}
1804+
17931805
// Functions below are for testing only
17941806
String clientId() {
17951807
return delegate.clientId();

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

+6
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,7 @@ public void close() {
557557
}
558558

559559
@Override
560+
@SuppressWarnings("deprecation")
560561
public synchronized void close(Duration timeout) {
561562
this.closed = true;
562563
}
@@ -570,6 +571,11 @@ public synchronized void wakeup() {
570571
wakeup.set(true);
571572
}
572573

574+
@Override
575+
public void close(CloseOption option) {
576+
this.closed = true;
577+
}
578+
573579
/**
574580
* Schedule a task to be executed during a poll(). One enqueued task will be executed per {@link #poll(Duration)}
575581
* invocation. You can use this repeatedly to mock out multiple responses to poll invocations.

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

+29-16
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.clients.ClientResponse;
2020
import org.apache.kafka.clients.CommonClientConfigs;
2121
import org.apache.kafka.clients.GroupRebalanceConfig;
22+
import org.apache.kafka.clients.consumer.GroupMembershipOperation;
2223
import org.apache.kafka.common.KafkaException;
2324
import org.apache.kafka.common.Node;
2425
import org.apache.kafka.common.errors.AuthenticationException;
@@ -1114,22 +1115,26 @@ private boolean isProtocolTypeInconsistent(String protocolType) {
11141115
*/
11151116
@Override
11161117
public final void close() {
1117-
close(time.timer(0));
1118+
close(time.timer(0), GroupMembershipOperation.DEFAULT);
11181119
}
11191120

11201121
/**
11211122
* @throws KafkaException if the rebalance callback throws exception
11221123
*/
1123-
protected void close(Timer timer) {
1124+
protected void close(Timer timer, GroupMembershipOperation membershipOperation) {
11241125
try {
11251126
closeHeartbeatThread();
11261127
} finally {
11271128
// Synchronize after closing the heartbeat thread since heartbeat thread
11281129
// needs this lock to complete and terminate after close flag is set.
11291130
synchronized (this) {
1130-
if (rebalanceConfig.leaveGroupOnClose) {
1131+
// If membershipOperation is REMAIN_IN_GROUP, never send leave group request.
1132+
// If membershipOperation is DEFAULT, leave group based on rebalanceConfig.leaveGroupOnClose.
1133+
// Otherwise, leave group only if membershipOperation is LEAVE_GROUP.
1134+
if (GroupMembershipOperation.REMAIN_IN_GROUP != membershipOperation &&
1135+
(GroupMembershipOperation.LEAVE_GROUP == membershipOperation || rebalanceConfig.leaveGroupOnClose)) {
11311136
onLeavePrepare();
1132-
maybeLeaveGroup("the consumer is being closed");
1137+
maybeLeaveGroup(membershipOperation, "the consumer is being closed");
11331138
}
11341139

11351140
// At this point, there may be pending commits (async commits or sync commits that were
@@ -1151,31 +1156,28 @@ protected void handlePollTimeoutExpiry() {
11511156
"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " +
11521157
"returned in poll() with max.poll.records.");
11531158

1154-
maybeLeaveGroup("consumer poll timeout has expired.");
1159+
maybeLeaveGroup(GroupMembershipOperation.DEFAULT, "consumer poll timeout has expired.");
11551160
}
11561161

11571162
/**
1158-
* Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership or is already
1159-
* not part of the group (ie does not have a valid member id, is in the UNJOINED state, or the coordinator is unknown).
1163+
* Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership
1164+
* with the default consumer group membership operation, or is already not part of the group (i.e., does not have a
1165+
* valid member ID, is in the UNJOINED state, or the coordinator is unknown).
11601166
*
1167+
* @param membershipOperation the operation on consumer group membership that the consumer will perform when closing
11611168
* @param leaveReason the reason to leave the group for logging
11621169
* @throws KafkaException if the rebalance callback throws exception
11631170
*/
1164-
public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
1171+
public synchronized RequestFuture<Void> maybeLeaveGroup(GroupMembershipOperation membershipOperation, String leaveReason) {
11651172
RequestFuture<Void> future = null;
11661173

1167-
// Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker,
1168-
// consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup,
1169-
// and the membership expiration is only controlled by session timeout.
1170-
if (isDynamicMember() && !coordinatorUnknown() &&
1171-
state != MemberState.UNJOINED && generation.hasMemberId()) {
1172-
// this is a minimal effort attempt to leave the group. we do not
1173-
// attempt any resending if the request fails or times out.
1174+
if (shouldSendLeaveGroupRequest(membershipOperation)) {
11741175
log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
11751176
generation.memberId, coordinator, leaveReason);
1177+
11761178
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
11771179
rebalanceConfig.groupId,
1178-
Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
1180+
List.of(new MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
11791181
);
11801182

11811183
future = client.send(coordinator, request).compose(new LeaveGroupResponseHandler(generation));
@@ -1187,6 +1189,17 @@ public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
11871189
return future;
11881190
}
11891191

1192+
private boolean shouldSendLeaveGroupRequest(GroupMembershipOperation membershipOperation) {
1193+
// According to KIP-1092, static members can leave the group if the consumer group membership operation is LEAVE_GROUP.
1194+
// If the operation is REMAIN_IN_GROUP, this method "maybeLeaveGroup" will not be invoked.
1195+
return membershipOperation == GroupMembershipOperation.LEAVE_GROUP ||
1196+
// Below is the default behavior for consumer group membership:
1197+
// Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker,
1198+
// consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup,
1199+
// and the membership expiration is only controlled by session timeout.
1200+
(isDynamicMember() && !coordinatorUnknown() && state != MemberState.UNJOINED && generation.hasMemberId());
1201+
}
1202+
11901203
protected boolean isDynamicMember() {
11911204
return rebalanceConfig.groupInstanceId.isEmpty();
11921205
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.CommonClientConfigs;
2020
import org.apache.kafka.clients.consumer.ConsumerConfig;
21+
import org.apache.kafka.clients.consumer.GroupMembershipOperation;
2122
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
2223
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
2324
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
@@ -221,7 +222,17 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
221222
*/
222223
@Override
223224
public PollResult pollOnClose(long currentTimeMs) {
224-
if (membershipManager().isLeavingGroup()) {
225+
AbstractMembershipManager<R> membershipManager = membershipManager();
226+
GroupMembershipOperation leaveGroupOperation = membershipManager.leaveGroupOperation();
227+
228+
if (membershipManager.isLeavingGroup() &&
229+
// Default operation: both static and dynamic consumers will send a leave heartbeat
230+
(GroupMembershipOperation.DEFAULT == leaveGroupOperation ||
231+
// Leave operation: both static and dynamic consumers will send a leave heartbeat
232+
GroupMembershipOperation.LEAVE_GROUP == leaveGroupOperation ||
233+
// Remain in group: only static consumers will send a leave heartbeat, while dynamic members will not
234+
membershipManager.groupInstanceId().isPresent())
235+
) {
225236
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, true);
226237
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
227238
}

0 commit comments

Comments
 (0)