Skip to content

Commit e380968

Browse files
authored
KAFKA-18613: Unit tests for usage of incorrect RPCs (#18383)
In the GroupMetadataManager, we may call an RPC on an incorrect group type. This adds unit tests to validate the behavior when an RPC is used on an incorrect group type. Reviewers: Bill Bejeck <[email protected]>
1 parent 6f4d425 commit e380968

File tree

1 file changed

+344
-0
lines changed

1 file changed

+344
-0
lines changed

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

+344
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@
144144
import java.nio.ByteBuffer;
145145
import java.util.ArrayList;
146146
import java.util.Arrays;
147+
import java.util.Collections;
147148
import java.util.HashMap;
148149
import java.util.HashSet;
149150
import java.util.LinkedHashMap;
@@ -18886,6 +18887,349 @@ public void testShareGroupDescribeOnConsumerGroup() {
1888618887
assertEquals(expected, actual);
1888718888
}
1888818889

18890+
@Test
18891+
public void testConsumerGroupHeartbeatOnStreamsGroup() {
18892+
String groupId = "group-foo";
18893+
String memberId = Uuid.randomUuid().toString();
18894+
18895+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18896+
.withMetadataImage(MetadataImage.EMPTY)
18897+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
18898+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
18899+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
18900+
.setMemberEpoch(1)
18901+
.setPreviousMemberEpoch(0)
18902+
.setClientId(DEFAULT_CLIENT_ID)
18903+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18904+
.build())
18905+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
18906+
.withTargetAssignmentEpoch(1))
18907+
.build();
18908+
18909+
assertThrows(GroupIdNotFoundException.class, () -> context.consumerGroupHeartbeat(
18910+
new ConsumerGroupHeartbeatRequestData()
18911+
.setMemberId(memberId)
18912+
.setGroupId(groupId)
18913+
.setMemberEpoch(0)
18914+
.setServerAssignor("range")
18915+
.setRebalanceTimeoutMs(5000)
18916+
.setSubscribedTopicNames(List.of("foo", "bar"))
18917+
.setTopicPartitions(Collections.emptyList())));
18918+
}
18919+
18920+
@Test
18921+
public void testShareGroupHeartbeatOnStreamsGroup() {
18922+
String groupId = "group-foo";
18923+
String memberId = Uuid.randomUuid().toString();
18924+
18925+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18926+
.withMetadataImage(MetadataImage.EMPTY)
18927+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
18928+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
18929+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
18930+
.setMemberEpoch(1)
18931+
.setPreviousMemberEpoch(0)
18932+
.setClientId(DEFAULT_CLIENT_ID)
18933+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18934+
.build())
18935+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
18936+
.withTargetAssignmentEpoch(1))
18937+
.build();
18938+
18939+
assertThrows(GroupIdNotFoundException.class, () -> context.shareGroupHeartbeat(
18940+
new ShareGroupHeartbeatRequestData()
18941+
.setMemberId(memberId)
18942+
.setGroupId(groupId)
18943+
.setMemberEpoch(0)
18944+
.setSubscribedTopicNames(List.of("foo", "bar"))));
18945+
}
18946+
18947+
@Test
18948+
public void testClassicGroupJoinOnStreamsGroup() throws Exception {
18949+
String groupId = "group-foo";
18950+
String memberId = Uuid.randomUuid().toString();
18951+
18952+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18953+
.withMetadataImage(MetadataImage.EMPTY)
18954+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
18955+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
18956+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
18957+
.setMemberEpoch(1)
18958+
.setPreviousMemberEpoch(0)
18959+
.setClientId(DEFAULT_CLIENT_ID)
18960+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18961+
.build())
18962+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
18963+
.withTargetAssignmentEpoch(1))
18964+
.build();
18965+
18966+
JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
18967+
.withGroupId(groupId)
18968+
.withMemberId(UNKNOWN_MEMBER_ID)
18969+
.withProtocolType("consumer")
18970+
.withProtocols(new JoinGroupRequestProtocolCollection(0))
18971+
.build();
18972+
18973+
GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request);
18974+
assertTrue(joinResult.joinFuture.isDone());
18975+
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), joinResult.joinFuture.get().errorCode());
18976+
}
18977+
18978+
@Test
18979+
public void testClassicGroupSyncOnStreamsGroup() throws Exception {
18980+
String groupId = "group-foo";
18981+
String memberId = Uuid.randomUuid().toString();
18982+
18983+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18984+
.withMetadataImage(MetadataImage.EMPTY)
18985+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
18986+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
18987+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
18988+
.setMemberEpoch(1)
18989+
.setPreviousMemberEpoch(0)
18990+
.setClientId(DEFAULT_CLIENT_ID)
18991+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18992+
.build())
18993+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
18994+
.withTargetAssignmentEpoch(1))
18995+
.build();
18996+
18997+
SyncGroupRequestData request = new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
18998+
.withGroupId(groupId)
18999+
.withGenerationId(1)
19000+
.withMemberId(memberId)
19001+
.build();
19002+
19003+
GroupMetadataManagerTestContext.SyncResult syncResult = context.sendClassicGroupSync(request);
19004+
19005+
assertTrue(syncResult.records.isEmpty());
19006+
assertTrue(syncResult.syncFuture.isDone());
19007+
assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), syncResult.syncFuture.get().errorCode());
19008+
}
19009+
19010+
@Test
19011+
public void testClassicGroupLeaveOnStreamsGroup() throws Exception {
19012+
String groupId = "group-foo";
19013+
String memberId = Uuid.randomUuid().toString();
19014+
19015+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
19016+
.withMetadataImage(MetadataImage.EMPTY)
19017+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
19018+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
19019+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
19020+
.setMemberEpoch(1)
19021+
.setPreviousMemberEpoch(0)
19022+
.setClientId(DEFAULT_CLIENT_ID)
19023+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
19024+
.build())
19025+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
19026+
.withTargetAssignmentEpoch(1))
19027+
.build();
19028+
19029+
assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupLeave(
19030+
new LeaveGroupRequestData()
19031+
.setGroupId(groupId)
19032+
.setMembers(List.of(
19033+
new MemberIdentity()
19034+
.setMemberId(memberId)))));
19035+
}
19036+
19037+
@Test
19038+
public void testConsumerGroupDescribeOnStreamsGroup() {
19039+
String groupId = "group-foo";
19040+
String memberId = Uuid.randomUuid().toString();
19041+
19042+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
19043+
.withMetadataImage(MetadataImage.EMPTY)
19044+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
19045+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
19046+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
19047+
.setMemberEpoch(1)
19048+
.setPreviousMemberEpoch(0)
19049+
.setClientId(DEFAULT_CLIENT_ID)
19050+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
19051+
.build())
19052+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
19053+
.withTargetAssignmentEpoch(1))
19054+
.build();
19055+
19056+
List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = List.of(
19057+
new ConsumerGroupDescribeResponseData.DescribedGroup()
19058+
.setGroupId(groupId)
19059+
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
19060+
.setErrorMessage("Group " + groupId + " is not a consumer group.")
19061+
);
19062+
19063+
List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(List.of(groupId));
19064+
assertEquals(expected, actual);
19065+
}
19066+
19067+
@Test
19068+
public void testShareGroupDescribeOnStreamsGroup() {
19069+
String groupId = "group-foo";
19070+
String memberId = Uuid.randomUuid().toString();
19071+
19072+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
19073+
.withMetadataImage(MetadataImage.EMPTY)
19074+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
19075+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
19076+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
19077+
.setMemberEpoch(1)
19078+
.setPreviousMemberEpoch(0)
19079+
.setClientId(DEFAULT_CLIENT_ID)
19080+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
19081+
.build())
19082+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
19083+
.withTargetAssignmentEpoch(1))
19084+
.build();
19085+
19086+
List<ShareGroupDescribeResponseData.DescribedGroup> expected = List.of(
19087+
new ShareGroupDescribeResponseData.DescribedGroup()
19088+
.setGroupId(groupId)
19089+
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
19090+
.setErrorMessage("Group " + groupId + " is not a share group.")
19091+
);
19092+
19093+
List<ShareGroupDescribeResponseData.DescribedGroup> actual = context.sendShareGroupDescribe(List.of(groupId));
19094+
assertEquals(expected, actual);
19095+
}
19096+
19097+
@Test
19098+
public void testStreamsGroupHeartbeatOnConsumerGroup() {
19099+
String groupId = "group-foo";
19100+
// Use a static member id as it makes the test easier.
19101+
String memberId1 = Uuid.randomUuid().toString();
19102+
19103+
Uuid fooTopicId = Uuid.randomUuid();
19104+
String fooTopicName = "foo";
19105+
19106+
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
19107+
19108+
// Consumer group with one static member.
19109+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
19110+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
19111+
.withMetadataImage(new MetadataImageBuilder()
19112+
.addTopic(fooTopicId, fooTopicName, 6)
19113+
.build())
19114+
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
19115+
.withMember(new ConsumerGroupMember.Builder(memberId1)
19116+
.setState(MemberState.STABLE)
19117+
.setInstanceId(memberId1)
19118+
.setMemberEpoch(10)
19119+
.setPreviousMemberEpoch(9)
19120+
.setClientId(DEFAULT_CLIENT_ID)
19121+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
19122+
.setSubscribedTopicNames(List.of("foo", "bar"))
19123+
.setServerAssignorName("range")
19124+
.setAssignedPartitions(mkAssignment(
19125+
mkTopicAssignment(fooTopicId, 0, 1, 2)))
19126+
.build())
19127+
.withAssignment(memberId1, mkAssignment(
19128+
mkTopicAssignment(fooTopicId, 0, 1, 2)))
19129+
.withAssignmentEpoch(10))
19130+
.build();
19131+
19132+
assertThrows(GroupIdNotFoundException.class, () ->
19133+
context.streamsGroupHeartbeat(
19134+
new StreamsGroupHeartbeatRequestData()
19135+
.setGroupId(groupId)
19136+
.setMemberId(Uuid.randomUuid().toString())
19137+
.setMemberEpoch(1)));
19138+
}
19139+
19140+
@Test
19141+
public void testStreamsGroupDescribeOnConsumerGroup() {
19142+
String groupId = "group-foo";
19143+
String memberId = Uuid.randomUuid().toString();
19144+
19145+
int epoch = 10;
19146+
String topicName = "topicName";
19147+
ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId)
19148+
.setSubscribedTopicNames(List.of(topicName))
19149+
.setServerAssignorName("assignorName");
19150+
19151+
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
19152+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
19153+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
19154+
.withConsumerGroup(new ConsumerGroupBuilder(groupId, epoch)
19155+
.withMember(memberBuilder.build()))
19156+
.build();
19157+
19158+
List<StreamsGroupDescribeResponseData.DescribedGroup> expected = List.of(
19159+
new StreamsGroupDescribeResponseData.DescribedGroup()
19160+
.setGroupId(groupId)
19161+
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
19162+
.setErrorMessage("Group " + groupId + " is not a streams group.")
19163+
);
19164+
19165+
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(List.of(groupId));
19166+
assertEquals(expected, actual);
19167+
}
19168+
19169+
19170+
@Test
19171+
public void testStreamsGroupHeartbeatOnShareGroup() {
19172+
String groupId = "group-foo";
19173+
String memberId1 = Uuid.randomUuid().toString();
19174+
19175+
Uuid fooTopicId = Uuid.randomUuid();
19176+
String fooTopicName = "foo";
19177+
19178+
// Share group with one member.
19179+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
19180+
.withMetadataImage(new MetadataImageBuilder()
19181+
.addTopic(fooTopicId, fooTopicName, 6)
19182+
.build())
19183+
.withShareGroup(new ShareGroupBuilder(groupId, 10)
19184+
.withMember(new ShareGroupMember.Builder(memberId1)
19185+
.setState(MemberState.STABLE)
19186+
.setMemberEpoch(10)
19187+
.setPreviousMemberEpoch(9)
19188+
.setClientId(DEFAULT_CLIENT_ID)
19189+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
19190+
.setSubscribedTopicNames(List.of("foo", "bar"))
19191+
.setAssignedPartitions(mkAssignment(
19192+
mkTopicAssignment(fooTopicId, 0, 1, 2)))
19193+
.build())
19194+
.withAssignment(memberId1, mkAssignment(
19195+
mkTopicAssignment(fooTopicId, 0, 1, 2)))
19196+
.withAssignmentEpoch(10))
19197+
.build();
19198+
19199+
assertThrows(GroupIdNotFoundException.class, () ->
19200+
context.streamsGroupHeartbeat(
19201+
new StreamsGroupHeartbeatRequestData()
19202+
.setGroupId(groupId)
19203+
.setMemberId(Uuid.randomUuid().toString())
19204+
.setMemberEpoch(1)));
19205+
}
19206+
19207+
@Test
19208+
public void testStreamsGroupDescribeOnShareGroup() {
19209+
String groupId = "group-foo";
19210+
String memberId = Uuid.randomUuid().toString();
19211+
19212+
int epoch = 10;
19213+
String topicName = "topicName";
19214+
ShareGroupMember.Builder memberBuilder = new ShareGroupMember.Builder(memberId)
19215+
.setSubscribedTopicNames(List.of(topicName));
19216+
19217+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
19218+
.withShareGroup(new ShareGroupBuilder(groupId, epoch)
19219+
.withMember(memberBuilder.build()))
19220+
.build();
19221+
19222+
List<StreamsGroupDescribeResponseData.DescribedGroup> expected = List.of(
19223+
new StreamsGroupDescribeResponseData.DescribedGroup()
19224+
.setGroupId(groupId)
19225+
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
19226+
.setErrorMessage("Group " + groupId + " is not a streams group.")
19227+
);
19228+
19229+
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(List.of(groupId));
19230+
assertEquals(expected, actual);
19231+
}
19232+
1888919233
@Test
1889019234
public void testReplayConsumerGroupRegularExpression() {
1889119235
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()

0 commit comments

Comments
 (0)