Skip to content

Commit eb88e78

Browse files
authored
KAFKA-18827: Initialize share group state group coordinator impl. [3/N] (#19026)
* This PR adds impl for the initialize share groups call from the Group Coordinator perspective. * The initialize call on persister instance will be invoked by the `GroupCoordinatorService`, based on the response of the `GroupCoordinatorShard.shareGroupHeartbeat`. If there is new topic subscription or member assignment change (topic paritions incremented), the delta share partitions corresponding to the share group in question are returned as an optional initialize request. * The request is then sent to the share coordinator as an encapsulated timer task because we want the heartbeat response to go asynchronously. * Tests have been added for `GroupCoordinatorService` and `GroupMetadataManager`. Existing tests have also been updated. * A new formatter `ShareGroupStatePartitionMetadataFormatter` has been added for debugging. Reviewers: Andrew Schofield <[email protected]>
1 parent 56d1dc1 commit eb88e78

File tree

20 files changed

+1641
-192
lines changed

20 files changed

+1641
-192
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

+154-76
Large diffs are not rendered by default.

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

+9-2
Original file line numberDiff line numberDiff line change
@@ -2726,8 +2726,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
27262726
client.listGroups(options).all.get.stream().filter(_.groupId == testGroupId).count() == 0
27272727
}, s"Expected to find zero groups")
27282728

2729-
val describeWithFakeGroupResult = client.describeShareGroups(util.Arrays.asList(testGroupId, fakeGroupId),
2730-
new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
2729+
var describeWithFakeGroupResult: DescribeShareGroupsResult = null
2730+
2731+
TestUtils.waitUntilTrue(() => {
2732+
describeWithFakeGroupResult = client.describeShareGroups(util.Arrays.asList(testGroupId, fakeGroupId),
2733+
new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
2734+
val members = describeWithFakeGroupResult.describedGroups().get(testGroupId).get().members()
2735+
members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic()).nonEmpty
2736+
}, s"Could not get partitions assigned. Last response $describeWithFakeGroupResult.")
2737+
27312738
assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
27322739

27332740
// Test that we can get information about the test share group.

core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala

+43-19
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
125125
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
126126

127127
// Verify the response.
128-
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
128+
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
129129
assertEquals(expectedAssignment, shareGroupHeartbeatResponse.data.assignment)
130130

131131
// Leave the group.
@@ -232,14 +232,26 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
232232

233233
// Heartbeats until the partitions are assigned for member 1.
234234
shareGroupHeartbeatResponse = null
235+
235236
TestUtils.waitUntilTrue(() => {
236237
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
237-
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && shareGroupHeartbeatResponse.data.assignment != null
238+
if (shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && shareGroupHeartbeatResponse.data().assignment() != null) {
239+
true
240+
} else {
241+
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
242+
new ShareGroupHeartbeatRequestData()
243+
.setGroupId("grp")
244+
.setMemberId(memberId1)
245+
.setMemberEpoch(shareGroupHeartbeatResponse.data.memberEpoch()),
246+
true
247+
).build()
248+
false
249+
}
238250
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
239251

240252
val topicPartitionsAssignedToMember1 = shareGroupHeartbeatResponse.data.assignment.topicPartitions()
241253
// Verify the response.
242-
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
254+
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
243255

244256
// Prepare the next heartbeat for member 2.
245257
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
@@ -259,7 +271,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
259271

260272
val topicPartitionsAssignedToMember2 = shareGroupHeartbeatResponse.data.assignment.topicPartitions()
261273
// Verify the response.
262-
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
274+
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
263275

264276
val partitionsAssigned: util.Set[Integer] = new util.HashSet[Integer]()
265277
topicPartitionsAssignedToMember1.forEach(topicPartition => {
@@ -290,7 +302,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
290302
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
291303

292304
// Verify the response.
293-
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
305+
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
294306
} finally {
295307
admin.close()
296308
}
@@ -369,7 +381,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
369381
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
370382

371383
// Verify the response.
372-
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
384+
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
373385

374386
// Member leaves the group.
375387
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
@@ -402,7 +414,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
402414
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
403415

404416
// Verify the response for member 1.
405-
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
417+
assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
406418
assertEquals(memberId, shareGroupHeartbeatResponse.data.memberId)
407419
// Partition assignment remains intact on rejoining.
408420
assertEquals(expectedAssignment, shareGroupHeartbeatResponse.data.assignment)
@@ -491,7 +503,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
491503
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
492504
}, msg = s"Could not get partitions for topic foo and bar assigned. Last response $shareGroupHeartbeatResponse.")
493505
// Verify the response.
494-
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
506+
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
495507
// Create the topic baz.
496508
val bazTopicId = TestUtils.createTopicWithAdminRaw(
497509
admin = admin,
@@ -515,7 +527,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
515527
new ShareGroupHeartbeatRequestData()
516528
.setGroupId("grp")
517529
.setMemberId(memberId)
518-
.setMemberEpoch(2),
530+
.setMemberEpoch(3),
519531
true
520532
).build()
521533

@@ -527,7 +539,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
527539
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
528540
}, msg = s"Could not get partitions for topic baz assigned. Last response $shareGroupHeartbeatResponse.")
529541
// Verify the response.
530-
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
542+
assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
531543
// Increasing the partitions of topic bar which is already being consumed in the share group.
532544
increasePartitions(admin, "bar", 6, Seq.empty)
533545

@@ -547,7 +559,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
547559
new ShareGroupHeartbeatRequestData()
548560
.setGroupId("grp")
549561
.setMemberId(memberId)
550-
.setMemberEpoch(3),
562+
.setMemberEpoch(5),
551563
true
552564
).build()
553565

@@ -559,7 +571,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
559571
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
560572
}, msg = s"Could not update partitions assignment for topic bar. Last response $shareGroupHeartbeatResponse.")
561573
// Verify the response.
562-
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
574+
assertEquals(7, shareGroupHeartbeatResponse.data.memberEpoch)
563575
// Delete the topic foo.
564576
TestUtils.deleteTopicWithAdmin(
565577
admin = admin,
@@ -581,7 +593,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
581593
new ShareGroupHeartbeatRequestData()
582594
.setGroupId("grp")
583595
.setMemberId(memberId)
584-
.setMemberEpoch(4),
596+
.setMemberEpoch(7),
585597
true
586598
).build()
587599

@@ -593,7 +605,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
593605
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
594606
}, msg = s"Could not update partitions assignment for topic foo. Last response $shareGroupHeartbeatResponse.")
595607
// Verify the response.
596-
assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
608+
assertEquals(8, shareGroupHeartbeatResponse.data.memberEpoch)
597609
} finally {
598610
admin.close()
599611
}
@@ -704,12 +716,24 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
704716
.setTopicId(barId)
705717
.setPartitions(List[Integer](0).asJava)).asJava)
706718

719+
shareGroupHeartbeatResponse = null
720+
707721
TestUtils.waitUntilTrue(() => {
708722
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
709-
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
710-
shareGroupHeartbeatResponse.data.assignment != null &&
723+
if (shareGroupHeartbeatResponse.data.assignment != null &&
711724
expectedAssignment.topicPartitions.containsAll(shareGroupHeartbeatResponse.data.assignment.topicPartitions) &&
712-
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
725+
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)) {
726+
true
727+
} else {
728+
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
729+
new ShareGroupHeartbeatRequestData()
730+
.setGroupId("grp")
731+
.setMemberId(memberId)
732+
.setMemberEpoch(shareGroupHeartbeatResponse.data.memberEpoch),
733+
true
734+
).build()
735+
false
736+
}
713737
}, msg = s"Could not get bar partitions assigned. Last response $shareGroupHeartbeatResponse.")
714738

715739
// Verify the response, the epoch should have been bumped.
@@ -840,7 +864,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
840864
shareGroupHeartbeatResponse.data.assignment == expectedAssignment
841865
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
842866
// Verify the response.
843-
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
867+
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
844868

845869
// Restart the only running broker.
846870
val broker = cluster.brokers().values().iterator().next()
@@ -864,7 +888,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
864888

865889
// Verify the response. Epoch should not have changed and null assignments determines that no
866890
// change in old assignment.
867-
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
891+
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
868892
assertNull(shareGroupHeartbeatResponse.data.assignment)
869893
} finally {
870894
admin.close()

group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscribedTopicDescriber.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public interface SubscribedTopicDescriber {
3030
*
3131
* @param topicId Uuid corresponding to the topic.
3232
* @return The number of partitions corresponding to the given topic Id,
33-
* or -1 if the topic Id does not exist.
33+
* or -1 if the topic id does not exist.
3434
*/
3535
int numPartitions(Uuid topicId);
3636

@@ -43,4 +43,14 @@ public interface SubscribedTopicDescriber {
4343
* If the topic Id does not exist, an empty set is returned.
4444
*/
4545
Set<String> racksForPartition(Uuid topicId, int partition);
46+
47+
/**
48+
* Returns a set of partitions corresponding to a topic id which
49+
* are allowlisted based on some criteria. For example, for share groups
50+
* only partitions which have been initialized are returned.
51+
*
52+
* @param topicId The uuid of the topic
53+
* @return The set of integers representing assignable partitions. Could be empty, or contain all partitions.
54+
*/
55+
Set<Integer> assignablePartitions(Uuid topicId);
4656
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java

+40
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
4848
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
4949
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
50+
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
51+
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
5052
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
5153
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
5254
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
@@ -812,6 +814,44 @@ public static CoordinatorRecord newShareGroupCurrentAssignmentTombstoneRecord(
812814
);
813815
}
814816

817+
/**
818+
* Creates a ShareGroupStatePartitionMetadata record.
819+
*
820+
* @param groupId The share group id.
821+
* @param initializedTopics Topics which have been initialized.
822+
* @param deletingTopics Topics which are being deleted.
823+
* @return The record.
824+
*/
825+
public static CoordinatorRecord newShareGroupStatePartitionMetadataRecord(
826+
String groupId,
827+
Map<Uuid, Map.Entry<String, Set<Integer>>> initializedTopics,
828+
Map<Uuid, String> deletingTopics
829+
) {
830+
List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo> initializedTopicPartitionInfo = initializedTopics.entrySet().stream()
831+
.map(entry -> new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
832+
.setTopicId(entry.getKey())
833+
.setTopicName(entry.getValue().getKey())
834+
.setPartitions(entry.getValue().getValue().stream().toList()))
835+
.toList();
836+
837+
List<ShareGroupStatePartitionMetadataValue.TopicInfo> deletingTopicsInfo = deletingTopics.entrySet().stream()
838+
.map(entry -> new ShareGroupStatePartitionMetadataValue.TopicInfo()
839+
.setTopicId(entry.getKey())
840+
.setTopicName(entry.getValue()))
841+
.toList();
842+
843+
return CoordinatorRecord.record(
844+
new ShareGroupStatePartitionMetadataKey()
845+
.setGroupId(groupId),
846+
new ApiMessageAndVersion(
847+
new ShareGroupStatePartitionMetadataValue()
848+
.setInitializedTopics(initializedTopicPartitionInfo)
849+
.setDeletingTopics(deletingTopicsInfo),
850+
(short) 0
851+
)
852+
);
853+
}
854+
815855
private static List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> toTopicPartitions(
816856
Map<Uuid, Set<Integer>> topicPartitions
817857
) {

0 commit comments

Comments
 (0)