Skip to content

Commit 0c5e5c5

Browse files
authored
KAFKA-18329; [2/3] Delete old group coordinator (KIP-848) (#19251)
This patch is the second of a series of patches to remove the old group coordinator. With the release of Apache Kafka 4.0, the so-called new group coordinator is the default and only option available now. The patch removes `group.coordinator.new.enable` (internal config) and all its usages (integration tests, unit tests, etc.). It also cleans up `KafkaApis` to remove logic only used by the old group coordinator. Reviewers: Jeff Kim <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 1ded681 commit 0c5e5c5

24 files changed

+61
-551
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,13 @@
4444
public class ConsumerIntegrationTest {
4545

4646
@ClusterTests({
47-
@ClusterTest(serverProperties = {
48-
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
49-
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
50-
@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false")
51-
}),
5247
@ClusterTest(serverProperties = {
5348
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
5449
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
5550
@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic")
5651
})
5752
})
58-
public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance clusterInstance) throws Exception {
53+
public void testAsyncConsumerWithConsumerProtocolDisabled(ClusterInstance clusterInstance) throws Exception {
5954
String topic = "test-topic";
6055
clusterInstance.createTopic(topic, 1, (short) 1);
6156
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(Map.of(

core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala

-2
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ private[group] class GroupCoordinatorAdapter(
6767
private val time: Time
6868
) extends org.apache.kafka.coordinator.group.GroupCoordinator {
6969

70-
override def isNewGroupCoordinator: Boolean = false
71-
7270
override def consumerGroupHeartbeat(
7371
context: RequestContext,
7472
request: ConsumerGroupHeartbeatRequestData

core/src/main/scala/kafka/server/BrokerServer.scala

+27-36
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package kafka.server
1919

20-
import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter, GroupCoordinatorAdapter}
20+
import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter}
2121
import kafka.coordinator.transaction.TransactionCoordinator
2222
import kafka.log.LogManager
2323
import kafka.log.remote.RemoteLogManager
@@ -622,41 +622,32 @@ class BrokerServer(
622622
// Create group coordinator, but don't start it until we've started replica manager.
623623
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good
624624
// to fix the underlying issue.
625-
if (config.isNewGroupCoordinatorEnabled) {
626-
val time = Time.SYSTEM
627-
val serde = new GroupCoordinatorRecordSerde
628-
val timer = new SystemTimerReaper(
629-
"group-coordinator-reaper",
630-
new SystemTimer("group-coordinator")
631-
)
632-
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
633-
time,
634-
replicaManager,
635-
serde,
636-
config.groupCoordinatorConfig.offsetsLoadBufferSize
637-
)
638-
val writer = new CoordinatorPartitionWriter(
639-
replicaManager
640-
)
641-
new GroupCoordinatorService.Builder(config.brokerId, config.groupCoordinatorConfig)
642-
.withTime(time)
643-
.withTimer(timer)
644-
.withLoader(loader)
645-
.withWriter(writer)
646-
.withCoordinatorRuntimeMetrics(new GroupCoordinatorRuntimeMetrics(metrics))
647-
.withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
648-
.withGroupConfigManager(groupConfigManager)
649-
.withPersister(persister)
650-
.withAuthorizer(authorizer.toJava)
651-
.build()
652-
} else {
653-
GroupCoordinatorAdapter(
654-
config,
655-
replicaManager,
656-
Time.SYSTEM,
657-
metrics
658-
)
659-
}
625+
val time = Time.SYSTEM
626+
val serde = new GroupCoordinatorRecordSerde
627+
val timer = new SystemTimerReaper(
628+
"group-coordinator-reaper",
629+
new SystemTimer("group-coordinator")
630+
)
631+
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
632+
time,
633+
replicaManager,
634+
serde,
635+
config.groupCoordinatorConfig.offsetsLoadBufferSize
636+
)
637+
val writer = new CoordinatorPartitionWriter(
638+
replicaManager
639+
)
640+
new GroupCoordinatorService.Builder(config.brokerId, config.groupCoordinatorConfig)
641+
.withTime(time)
642+
.withTimer(timer)
643+
.withLoader(loader)
644+
.withWriter(writer)
645+
.withCoordinatorRuntimeMetrics(new GroupCoordinatorRuntimeMetrics(metrics))
646+
.withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
647+
.withGroupConfigManager(groupConfigManager)
648+
.withPersister(persister)
649+
.withAuthorizer(authorizer.toJava)
650+
.build()
660651
}
661652

662653
private def createShareCoordinator(): Option[ShareCoordinator] = {

core/src/main/scala/kafka/server/KafkaApis.scala

+4-40
Original file line numberDiff line numberDiff line change
@@ -1646,40 +1646,8 @@ class KafkaApis(val requestChannel: RequestChannel,
16461646
trace(s"End transaction marker append for producer id $producerId completed with status: $currentErrors")
16471647
updateErrors(producerId, currentErrors)
16481648

1649-
def maybeSendResponse(): Unit = {
1650-
if (numAppends.decrementAndGet() == 0) {
1651-
requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
1652-
}
1653-
}
1654-
1655-
// The new group coordinator uses GroupCoordinator#completeTransaction so we do
1656-
// not need to call GroupCoordinator#onTransactionCompleted here.
1657-
if (config.isNewGroupCoordinatorEnabled) {
1658-
maybeSendResponse()
1659-
return
1660-
}
1661-
1662-
val successfulOffsetsPartitions = currentErrors.asScala.filter { case (topicPartition, error) =>
1663-
topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == Errors.NONE
1664-
}.keys
1665-
1666-
// If no end transaction marker has been written to a __consumer_offsets partition, we do not
1667-
// need to call GroupCoordinator#onTransactionCompleted.
1668-
if (successfulOffsetsPartitions.isEmpty) {
1669-
maybeSendResponse()
1670-
return
1671-
}
1672-
1673-
// Otherwise, we call GroupCoordinator#onTransactionCompleted to materialize the offsets
1674-
// into the cache and we wait until the meterialization is completed.
1675-
groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result).whenComplete { (_, exception) =>
1676-
if (exception != null) {
1677-
error(s"Received an exception while trying to update the offsets cache on transaction marker append", exception)
1678-
val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
1679-
successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR))
1680-
updateErrors(producerId, updatedErrors)
1681-
}
1682-
maybeSendResponse()
1649+
if (numAppends.decrementAndGet() == 0) {
1650+
requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
16831651
}
16841652
}
16851653

@@ -1727,9 +1695,7 @@ class KafkaApis(val requestChannel: RequestChannel,
17271695

17281696
val controlRecords = mutable.Map.empty[TopicPartition, MemoryRecords]
17291697
partitionsWithCompatibleMessageFormat.foreach { partition =>
1730-
if (groupCoordinator.isNewGroupCoordinator && partition.topic == GROUP_METADATA_TOPIC_NAME) {
1731-
// When the new group coordinator is used, writing the end marker is fully delegated
1732-
// to the group coordinator.
1698+
if (partition.topic == GROUP_METADATA_TOPIC_NAME) {
17331699
groupCoordinator.completeTransaction(
17341700
partition,
17351701
marker.producerId,
@@ -2525,7 +2491,6 @@ class KafkaApis(val requestChannel: RequestChannel,
25252491
}
25262492

25272493
def isConsumerGroupProtocolEnabled(): Boolean = {
2528-
groupCoordinator.isNewGroupCoordinator &&
25292494
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.CONSUMER) &&
25302495
groupVersion().isConsumerRebalanceProtocolSupported
25312496
}
@@ -2658,7 +2623,6 @@ class KafkaApis(val requestChannel: RequestChannel,
26582623
}
26592624

26602625
private def isStreamsGroupProtocolEnabled(): Boolean = {
2661-
groupCoordinator.isNewGroupCoordinator &&
26622626
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
26632627
}
26642628

@@ -3843,7 +3807,7 @@ class KafkaApis(val requestChannel: RequestChannel,
38433807
}
38443808

38453809
private def isShareGroupProtocolEnabled: Boolean = {
3846-
groupCoordinator.isNewGroupCoordinator && config.shareGroupConfig.isShareGroupEnabled
3810+
config.shareGroupConfig.isShareGroupEnabled
38473811
}
38483812

38493813
private def updateRecordConversionStats(request: RequestChannel.Request,

core/src/main/scala/kafka/server/KafkaConfig.scala

+1-11
Original file line numberDiff line numberDiff line change
@@ -388,28 +388,18 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
388388
/** ********* Controlled shutdown configuration ***********/
389389
val controlledShutdownEnable = getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG)
390390

391-
/** New group coordinator configs */
392-
val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG)
391+
/** Group coordinator configs */
393392
val groupCoordinatorRebalanceProtocols = {
394393
val protocols = getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
395394
.asScala.map(_.toUpperCase).map(GroupType.valueOf).toSet
396395
if (!protocols.contains(GroupType.CLASSIC)) {
397396
throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.")
398397
}
399-
if (protocols.contains(GroupType.CONSUMER) && !isNewGroupCoordinatorEnabled) {
400-
warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported with the new group coordinator.")
401-
}
402398
if (protocols.contains(GroupType.SHARE)) {
403-
if (!isNewGroupCoordinatorEnabled) {
404-
warn(s"The new '${GroupType.SHARE}' rebalance protocol is only supported with the new group coordinator.")
405-
}
406399
warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol are enabled. " +
407400
"This is part of the early access of KIP-932 and MUST NOT be used in production.")
408401
}
409402
if (protocols.contains(GroupType.STREAMS)) {
410-
if (!isNewGroupCoordinatorEnabled) {
411-
warn(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported with the new group coordinator.")
412-
}
413403
warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance protocol are enabled. " +
414404
"This is part of the early access of KIP-1071 and MUST NOT be used in production.")
415405
}

core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala

+3-20
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
2222
import org.apache.kafka.common.test.ClusterInstance
2323
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
2424
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
25-
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
25+
import org.junit.jupiter.api.Assertions.assertEquals
2626

2727
@ClusterTestDefaults(types = Array(Type.KRAFT))
2828
class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@@ -32,7 +32,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator
3232
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
3333
)
3434
)
35-
def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
35+
def testDeleteGroupsWithNewConsumerGroupProtocol(): Unit = {
3636
testDeleteGroups(true)
3737
}
3838

@@ -42,28 +42,11 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator
4242
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
4343
)
4444
)
45-
def testDeleteGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
46-
testDeleteGroups(false)
47-
}
48-
49-
@ClusterTest(
50-
types = Array(Type.KRAFT, Type.CO_KRAFT),
51-
serverProperties = Array(
52-
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
53-
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
54-
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
55-
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
56-
)
57-
)
58-
def testDeleteGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = {
45+
def testDeleteGroupsWithOldConsumerGroupProtocol(): Unit = {
5946
testDeleteGroups(false)
6047
}
6148

6249
private def testDeleteGroups(useNewProtocol: Boolean): Unit = {
63-
if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
64-
fail("Cannot use the new protocol with the old group coordinator.")
65-
}
66-
6750
// Creates the __consumer_offsets topics because it won't be created automatically
6851
// in this test because it does not use FindCoordinator API.
6952
createOffsetsTopic()

core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala

+1-15
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,7 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinat
3232
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
3333
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
3434
))
35-
def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
36-
testDescribeGroups()
37-
}
38-
39-
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
40-
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
41-
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
42-
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
43-
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
44-
))
45-
def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = {
46-
testDescribeGroups()
47-
}
48-
49-
private def testDescribeGroups(): Unit = {
35+
def testDescribeGroupsWithOldConsumerGroupProtocol(): Unit = {
5036
// Creates the __consumer_offsets topics because it won't be created automatically
5137
// in this test because it does not use FindCoordinator API.
5238
createOffsetsTopic()

core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala

-4
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,6 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
117117
cluster.brokers.values.stream.allMatch(b => b.config.unstableApiVersionsEnabled)
118118
}
119119

120-
protected def isNewGroupCoordinatorEnabled: Boolean = {
121-
cluster.brokers.values.stream.allMatch(b => b.config.isNewGroupCoordinatorEnabled)
122-
}
123-
124120
protected def getTopicIds: Map[String, Uuid] = {
125121
cluster.controllers().get(cluster.controllerIds().iterator().next()).controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().asScala.toMap
126122
}

core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala

+1-15
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,7 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
3636
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
3737
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
3838
))
39-
def testHeartbeatWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
40-
testHeartbeat()
41-
}
42-
43-
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
44-
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
45-
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
46-
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
47-
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
48-
))
49-
def testHeartbeatWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = {
50-
testHeartbeat()
51-
}
52-
53-
private def testHeartbeat(): Unit = {
39+
def testHeartbeatWithOldConsumerGroupProtocol(): Unit = {
5440
// Creates the __consumer_offsets topics because it won't be created automatically
5541
// in this test because it does not use FindCoordinator API.
5642
createOffsetsTopic()

core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala

+1-15
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,7 @@ class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
4040
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
4141
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
4242
))
43-
def testJoinGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
44-
testJoinGroup()
45-
}
46-
47-
@ClusterTest(serverProperties = Array(
48-
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
49-
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
50-
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
51-
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
52-
))
53-
def testJoinGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = {
54-
testJoinGroup()
55-
}
56-
57-
private def testJoinGroup(): Unit = {
43+
def testJoinGroupWithOldConsumerGroupProtocol(): Unit = {
5844
// Creates the __consumer_offsets topics because it won't be created automatically
5945
// in this test because it does not use FindCoordinator API.
6046
createOffsetsTopic()

0 commit comments

Comments
 (0)