Skip to content

KAFKA-18185: remove internal.leave.group.on.close config #19400

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a4380ba
KAFKA-16758: remove internal.leave.group.on.close config
frankvicky Apr 7, 2025
f482865
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 8, 2025
f522eb1
Adjust the logic of StreamThread#shutdown
frankvicky Apr 8, 2025
ed9f86d
Fix style issue
frankvicky Apr 8, 2025
9f47d9e
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 8, 2025
8a53219
apply leaveGroup to StreamThread#shutdown
frankvicky Apr 8, 2025
dee54ad
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 8, 2025
00e013f
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 8, 2025
aef9785
Use close api instead of admin api
frankvicky Apr 9, 2025
d723718
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 9, 2025
6d95fcc
Use close api instead of admin api
frankvicky Apr 9, 2025
43df632
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 9, 2025
e5a4d96
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 9, 2025
8649da6
fix failed tests
frankvicky Apr 9, 2025
61efd8e
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 10, 2025
2d5bec2
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 12, 2025
f5d1c72
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 14, 2025
1eed3f6
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 15, 2025
39c654e
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 16, 2025
cc80fb4
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 17, 2025
dc95556
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 18, 2025
62d8ed1
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 20, 2025
80f1771
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 21, 2025
c356ae5
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 24, 2025
38b3207
Remove timeout argument
frankvicky Apr 24, 2025
0f04f13
pass false to close method
frankvicky Apr 24, 2025
2f9a230
Add comment back
frankvicky Apr 25, 2025
d9e5eed
address comments
frankvicky Apr 25, 2025
ffb74cb
Merge branch 'trunk' into KAFKA-16758-follow-up
frankvicky Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public String toString() {
public final Optional<String> groupInstanceId;
public final long retryBackoffMs;
public final long retryBackoffMaxMs;
public final boolean leaveGroupOnClose;

public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) {
this.sessionTimeoutMs = config.getInt(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG);
Expand Down Expand Up @@ -75,13 +74,6 @@ public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) {

this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG);

// Internal leave group config is only defined in Consumer.
if (protocolType == ProtocolType.CONSUMER) {
this.leaveGroupOnClose = config.getBoolean("internal.leave.group.on.close");
} else {
this.leaveGroupOnClose = true;
}
}

// For testing purpose.
Expand All @@ -91,15 +83,13 @@ public GroupRebalanceConfig(final int sessionTimeoutMs,
String groupId,
Optional<String> groupInstanceId,
long retryBackoffMs,
long retryBackoffMaxMs,
boolean leaveGroupOnClose) {
long retryBackoffMaxMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.groupId = groupId;
this.groupInstanceId = groupInstanceId;
this.retryBackoffMs = retryBackoffMs;
this.retryBackoffMaxMs = retryBackoffMaxMs;
this.leaveGroupOnClose = leaveGroupOnClose;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,17 +329,6 @@ public class ConsumerConfig extends AbstractConfig {
"be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.";
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;

/**
* <code>internal.leave.group.on.close</code>
* Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance
* won't occur until <code>session.timeout.ms</code> expires.
*
* <p>
* Note: this is an internal configuration and could be changed in the future in a backward incompatible way
*
*/
static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close";

/**
* <code>internal.throw.on.fetch.stable.offset.unsupported</code>
* Whether or not the consumer should throw when the new stable offset feature is supported.
Expand Down Expand Up @@ -637,10 +626,6 @@ public class ConsumerConfig extends AbstractConfig {
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
Importance.MEDIUM,
EXCLUDE_INTERNAL_TOPICS_DOC)
.defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW)
.defineInternal(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED,
Type.BOOLEAN,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@ protected void handlePollTimeoutExpiry() {
public synchronized RequestFuture<Void> maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, String leaveReason) {
RequestFuture<Void> future = null;

if (rebalanceConfig.leaveGroupOnClose && shouldSendLeaveGroupRequest(membershipOperation)) {
if (shouldSendLeaveGroupRequest(membershipOperation)) {
log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
generation.memberId, coordinator, leaveReason);
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
groupId.get(),
groupInstanceId,
retryBackoffMs,
retryBackoffMaxMs,
true
retryBackoffMaxMs
);
this.coordinator = new ConsumerCoordinator(
rebalanceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,7 @@ private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs) {
Optional.empty(), Optional.empty());
}


private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, int rebalanceTimeoutMs, Optional<String> groupInstanceId, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
setupCoordinator(retryBackoffMs, retryBackoffMaxMs, rebalanceTimeoutMs, groupInstanceId, heartbeatThreadSupplier, groupInstanceId.isEmpty());
}

private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, int rebalanceTimeoutMs, Optional<String> groupInstanceId, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier, boolean leaveOnClose) {
LogContext logContext = new LogContext();
this.mockTime = new MockTime();
ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, retryBackoffMaxMs, 60 * 60 * 1000L,
Expand All @@ -167,8 +162,7 @@ false, false, new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST
GROUP_ID,
groupInstanceId,
retryBackoffMs,
retryBackoffMaxMs,
leaveOnClose);
retryBackoffMaxMs);
this.coordinator = new DummyCoordinator(rebalanceConfig,
consumerClient,
metrics,
Expand Down Expand Up @@ -1108,7 +1102,7 @@ public void testLeaveGroupSentWithGroupInstanceIdUnSet() {
@ParameterizedTest
@MethodSource("groupInstanceIdAndMembershipOperationMatrix")
public void testLeaveGroupSentWithGroupInstanceIdUnSetAndDifferentGroupMembershipOperation(Optional<String> groupInstanceId, CloseOptions.GroupMembershipOperation operation) {
checkLeaveGroupRequestSent(groupInstanceId, operation, Optional.empty(), true);
checkLeaveGroupRequestSent(groupInstanceId, operation, Optional.empty());
}

private static Stream<Arguments> groupInstanceIdAndMembershipOperationMatrix() {
Expand All @@ -1123,11 +1117,11 @@ private static Stream<Arguments> groupInstanceIdAndMembershipOperationMatrix() {
}

private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId) {
checkLeaveGroupRequestSent(groupInstanceId, CloseOptions.GroupMembershipOperation.DEFAULT, Optional.empty(), groupInstanceId.isEmpty());
checkLeaveGroupRequestSent(groupInstanceId, CloseOptions.GroupMembershipOperation.DEFAULT, Optional.empty());
}

private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId, CloseOptions.GroupMembershipOperation operation, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier, boolean leaveOnClose) {
setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, groupInstanceId, heartbeatThreadSupplier, leaveOnClose);
private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId, CloseOptions.GroupMembershipOperation operation, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, groupInstanceId, heartbeatThreadSupplier);

mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstance
groupId,
groupInstanceId,
retryBackoffMs,
retryBackoffMaxMs,
groupInstanceId.isEmpty());
retryBackoffMaxMs);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ public void setUp() {
"group_id",
Optional.empty(),
retryBackoffMs,
retryBackoffMaxMs,
true);
retryBackoffMaxMs);
heartbeat = new Heartbeat(rebalanceConfig, time);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ public void init(ConnectProtocolCompatibility compatibility) {
groupId,
Optional.empty(),
retryBackoffMs,
retryBackoffMaxMs,
true);
retryBackoffMaxMs);
this.coordinator = new WorkerCoordinator(rebalanceConfig,
loggerFactory,
consumerClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ public void setup(ConnectProtocolCompatibility compatibility) {
groupId,
Optional.empty(),
retryBackoffMs,
retryBackoffMaxMs,
true);
retryBackoffMaxMs);
this.coordinator = new WorkerCoordinator(rebalanceConfig,
logContext,
consumerClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.time.{Duration => JDuration}
import java.util.Arrays.asList
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.{Collections, Locale, Optional, Properties}
import java.util.{Collections, Optional, Properties}
import java.{time, util}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
Expand Down Expand Up @@ -2331,9 +2331,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
// We need to set internal.leave.group.on.close to validate dynamic member removal, but it only works for ClassicConsumer
// After KIP-1092, we can control dynamic member removal for both ClassicConsumer and AsyncConsumer
defaultConsumerConfig.setProperty("internal.leave.group.on.close", "false")

val backgroundConsumerSet = new BackgroundConsumerSet(defaultConsumerConfig)
groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
Expand Down Expand Up @@ -2382,14 +2379,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
var testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, testGroupDescription.groupId)
assertFalse(testGroupDescription.isSimpleConsumerGroup)

// Although we set `internal.leave.group.on.close` in the consumer, it only works for ClassicConsumer.
// After KIP-1092, we can control dynamic member removal in consumer.close()
if (groupProtocol == GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)) {
assertEquals(3, testGroupDescription.members().size())
} else if (groupProtocol == GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) {
assertEquals(2, testGroupDescription.members().size())
}
assertEquals(2, testGroupDescription.members().size())

// Test delete one static member
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId,
Expand All @@ -2402,11 +2392,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get()

if (groupProtocol == GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)) {
assertEquals(2, testGroupDescription.members().size())
} else if (groupProtocol == GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) {
assertEquals(1, testGroupDescription.members().size())
}
assertEquals(1, testGroupDescription.members().size())

// Delete all active members remaining
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions())
Expand Down
Loading