Skip to content

Commit cbe6f3d

Browse files
committed
KAFKA-18613: Unit tests for usage of incorrect RPCs / group type pairs
In the GroupMetadataManager, we may call an RPC on an incorrect group type. This adds unit tests to validate the behavior when an RPC is used on an incorrect group type.
1 parent f9bac3e commit cbe6f3d

File tree

1 file changed

+344
-0
lines changed

1 file changed

+344
-0
lines changed

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

+344
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@
138138
import java.nio.ByteBuffer;
139139
import java.util.ArrayList;
140140
import java.util.Arrays;
141+
import java.util.Collections;
141142
import java.util.HashMap;
142143
import java.util.LinkedHashMap;
143144
import java.util.List;
@@ -18685,6 +18686,349 @@ public void testShareGroupDescribeOnConsumerGroup() {
1868518686
assertEquals(expected, actual);
1868618687
}
1868718688

18689+
@Test
18690+
public void testConsumerGroupHeartbeatOnStreamsGroup() {
18691+
String groupId = "group-foo";
18692+
String memberId = Uuid.randomUuid().toString();
18693+
18694+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18695+
.withMetadataImage(MetadataImage.EMPTY)
18696+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
18697+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
18698+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
18699+
.setMemberEpoch(1)
18700+
.setPreviousMemberEpoch(0)
18701+
.setClientId(DEFAULT_CLIENT_ID)
18702+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18703+
.build())
18704+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
18705+
.withTargetAssignmentEpoch(1))
18706+
.build();
18707+
18708+
assertThrows(GroupIdNotFoundException.class, () -> context.consumerGroupHeartbeat(
18709+
new ConsumerGroupHeartbeatRequestData()
18710+
.setMemberId(memberId)
18711+
.setGroupId(groupId)
18712+
.setMemberEpoch(0)
18713+
.setServerAssignor("range")
18714+
.setRebalanceTimeoutMs(5000)
18715+
.setSubscribedTopicNames(List.of("foo", "bar"))
18716+
.setTopicPartitions(Collections.emptyList())));
18717+
}
18718+
18719+
@Test
18720+
public void testShareGroupHeartbeatOnStreamsGroup() {
18721+
String groupId = "group-foo";
18722+
String memberId = Uuid.randomUuid().toString();
18723+
18724+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18725+
.withMetadataImage(MetadataImage.EMPTY)
18726+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
18727+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
18728+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
18729+
.setMemberEpoch(1)
18730+
.setPreviousMemberEpoch(0)
18731+
.setClientId(DEFAULT_CLIENT_ID)
18732+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18733+
.build())
18734+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
18735+
.withTargetAssignmentEpoch(1))
18736+
.build();
18737+
18738+
assertThrows(GroupIdNotFoundException.class, () -> context.shareGroupHeartbeat(
18739+
new ShareGroupHeartbeatRequestData()
18740+
.setMemberId(memberId)
18741+
.setGroupId(groupId)
18742+
.setMemberEpoch(0)
18743+
.setSubscribedTopicNames(List.of("foo", "bar"))));
18744+
}
18745+
18746+
@Test
18747+
public void testClassicGroupJoinOnStreamsGroup() throws Exception {
18748+
String groupId = "group-foo";
18749+
String memberId = Uuid.randomUuid().toString();
18750+
18751+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18752+
.withMetadataImage(MetadataImage.EMPTY)
18753+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
18754+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
18755+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
18756+
.setMemberEpoch(1)
18757+
.setPreviousMemberEpoch(0)
18758+
.setClientId(DEFAULT_CLIENT_ID)
18759+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18760+
.build())
18761+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
18762+
.withTargetAssignmentEpoch(1))
18763+
.build();
18764+
18765+
JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
18766+
.withGroupId(groupId)
18767+
.withMemberId(UNKNOWN_MEMBER_ID)
18768+
.withProtocolType("consumer")
18769+
.withProtocols(new JoinGroupRequestProtocolCollection(0))
18770+
.build();
18771+
18772+
GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request);
18773+
assertTrue(joinResult.joinFuture.isDone());
18774+
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), joinResult.joinFuture.get().errorCode());
18775+
}
18776+
18777+
@Test
18778+
public void testClassicGroupSyncOnStreamsGroup() throws Exception {
18779+
String groupId = "group-foo";
18780+
String memberId = Uuid.randomUuid().toString();
18781+
18782+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18783+
.withMetadataImage(MetadataImage.EMPTY)
18784+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
18785+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
18786+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
18787+
.setMemberEpoch(1)
18788+
.setPreviousMemberEpoch(0)
18789+
.setClientId(DEFAULT_CLIENT_ID)
18790+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18791+
.build())
18792+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
18793+
.withTargetAssignmentEpoch(1))
18794+
.build();
18795+
18796+
SyncGroupRequestData request = new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
18797+
.withGroupId(groupId)
18798+
.withGenerationId(1)
18799+
.withMemberId(memberId)
18800+
.build();
18801+
18802+
GroupMetadataManagerTestContext.SyncResult syncResult = context.sendClassicGroupSync(request);
18803+
18804+
assertTrue(syncResult.records.isEmpty());
18805+
assertTrue(syncResult.syncFuture.isDone());
18806+
assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), syncResult.syncFuture.get().errorCode());
18807+
}
18808+
18809+
@Test
18810+
public void testClassicGroupLeaveOnStreamsGroup() throws Exception {
18811+
String groupId = "group-foo";
18812+
String memberId = Uuid.randomUuid().toString();
18813+
18814+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18815+
.withMetadataImage(MetadataImage.EMPTY)
18816+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
18817+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
18818+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
18819+
.setMemberEpoch(1)
18820+
.setPreviousMemberEpoch(0)
18821+
.setClientId(DEFAULT_CLIENT_ID)
18822+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18823+
.build())
18824+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
18825+
.withTargetAssignmentEpoch(1))
18826+
.build();
18827+
18828+
assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupLeave(
18829+
new LeaveGroupRequestData()
18830+
.setGroupId(groupId)
18831+
.setMembers(List.of(
18832+
new MemberIdentity()
18833+
.setMemberId(memberId)))));
18834+
}
18835+
18836+
@Test
18837+
public void testConsumerGroupDescribeOnStreamsGroup() {
18838+
String groupId = "group-foo";
18839+
String memberId = Uuid.randomUuid().toString();
18840+
18841+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18842+
.withMetadataImage(MetadataImage.EMPTY)
18843+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
18844+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
18845+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
18846+
.setMemberEpoch(1)
18847+
.setPreviousMemberEpoch(0)
18848+
.setClientId(DEFAULT_CLIENT_ID)
18849+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18850+
.build())
18851+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
18852+
.withTargetAssignmentEpoch(1))
18853+
.build();
18854+
18855+
List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = List.of(
18856+
new ConsumerGroupDescribeResponseData.DescribedGroup()
18857+
.setGroupId(groupId)
18858+
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
18859+
.setErrorMessage("Group " + groupId + " is not a consumer group.")
18860+
);
18861+
18862+
List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(List.of(groupId));
18863+
assertEquals(expected, actual);
18864+
}
18865+
18866+
@Test
18867+
public void testShareGroupDescribeOnStreamsGroup() {
18868+
String groupId = "group-foo";
18869+
String memberId = Uuid.randomUuid().toString();
18870+
18871+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18872+
.withMetadataImage(MetadataImage.EMPTY)
18873+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 1)
18874+
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
18875+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
18876+
.setMemberEpoch(1)
18877+
.setPreviousMemberEpoch(0)
18878+
.setClientId(DEFAULT_CLIENT_ID)
18879+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18880+
.build())
18881+
.withTargetAssignment(memberId, TasksTuple.EMPTY)
18882+
.withTargetAssignmentEpoch(1))
18883+
.build();
18884+
18885+
List<ShareGroupDescribeResponseData.DescribedGroup> expected = List.of(
18886+
new ShareGroupDescribeResponseData.DescribedGroup()
18887+
.setGroupId(groupId)
18888+
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
18889+
.setErrorMessage("Group " + groupId + " is not a share group.")
18890+
);
18891+
18892+
List<ShareGroupDescribeResponseData.DescribedGroup> actual = context.sendShareGroupDescribe(List.of(groupId));
18893+
assertEquals(expected, actual);
18894+
}
18895+
18896+
@Test
18897+
public void testStreamsGroupHeartbeatOnConsumerGroup() {
18898+
String groupId = "group-foo";
18899+
// Use a static member id as it makes the test easier.
18900+
String memberId1 = Uuid.randomUuid().toString();
18901+
18902+
Uuid fooTopicId = Uuid.randomUuid();
18903+
String fooTopicName = "foo";
18904+
18905+
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
18906+
18907+
// Consumer group with one static member.
18908+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18909+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
18910+
.withMetadataImage(new MetadataImageBuilder()
18911+
.addTopic(fooTopicId, fooTopicName, 6)
18912+
.build())
18913+
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
18914+
.withMember(new ConsumerGroupMember.Builder(memberId1)
18915+
.setState(MemberState.STABLE)
18916+
.setInstanceId(memberId1)
18917+
.setMemberEpoch(10)
18918+
.setPreviousMemberEpoch(9)
18919+
.setClientId(DEFAULT_CLIENT_ID)
18920+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18921+
.setSubscribedTopicNames(List.of("foo", "bar"))
18922+
.setServerAssignorName("range")
18923+
.setAssignedPartitions(mkAssignment(
18924+
mkTopicAssignment(fooTopicId, 0, 1, 2)))
18925+
.build())
18926+
.withAssignment(memberId1, mkAssignment(
18927+
mkTopicAssignment(fooTopicId, 0, 1, 2)))
18928+
.withAssignmentEpoch(10))
18929+
.build();
18930+
18931+
assertThrows(GroupIdNotFoundException.class, () ->
18932+
context.streamsGroupHeartbeat(
18933+
new StreamsGroupHeartbeatRequestData()
18934+
.setGroupId(groupId)
18935+
.setMemberId(Uuid.randomUuid().toString())
18936+
.setMemberEpoch(1)));
18937+
}
18938+
18939+
@Test
18940+
public void testStreamsGroupDescribeOnConsumerGroup() {
18941+
String groupId = "group-foo";
18942+
String memberId = Uuid.randomUuid().toString();
18943+
18944+
int epoch = 10;
18945+
String topicName = "topicName";
18946+
ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId)
18947+
.setSubscribedTopicNames(List.of(topicName))
18948+
.setServerAssignorName("assignorName");
18949+
18950+
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
18951+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18952+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
18953+
.withConsumerGroup(new ConsumerGroupBuilder(groupId, epoch)
18954+
.withMember(memberBuilder.build()))
18955+
.build();
18956+
18957+
List<StreamsGroupDescribeResponseData.DescribedGroup> expected = List.of(
18958+
new StreamsGroupDescribeResponseData.DescribedGroup()
18959+
.setGroupId(groupId)
18960+
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
18961+
.setErrorMessage("Group " + groupId + " is not a streams group.")
18962+
);
18963+
18964+
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(List.of(groupId));
18965+
assertEquals(expected, actual);
18966+
}
18967+
18968+
18969+
@Test
18970+
public void testStreamsGroupHeartbeatOnShareGroup() {
18971+
String groupId = "group-foo";
18972+
String memberId1 = Uuid.randomUuid().toString();
18973+
18974+
Uuid fooTopicId = Uuid.randomUuid();
18975+
String fooTopicName = "foo";
18976+
18977+
// Share group with one member.
18978+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18979+
.withMetadataImage(new MetadataImageBuilder()
18980+
.addTopic(fooTopicId, fooTopicName, 6)
18981+
.build())
18982+
.withShareGroup(new ShareGroupBuilder(groupId, 10)
18983+
.withMember(new ShareGroupMember.Builder(memberId1)
18984+
.setState(MemberState.STABLE)
18985+
.setMemberEpoch(10)
18986+
.setPreviousMemberEpoch(9)
18987+
.setClientId(DEFAULT_CLIENT_ID)
18988+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
18989+
.setSubscribedTopicNames(List.of("foo", "bar"))
18990+
.setAssignedPartitions(mkAssignment(
18991+
mkTopicAssignment(fooTopicId, 0, 1, 2)))
18992+
.build())
18993+
.withAssignment(memberId1, mkAssignment(
18994+
mkTopicAssignment(fooTopicId, 0, 1, 2)))
18995+
.withAssignmentEpoch(10))
18996+
.build();
18997+
18998+
assertThrows(GroupIdNotFoundException.class, () ->
18999+
context.streamsGroupHeartbeat(
19000+
new StreamsGroupHeartbeatRequestData()
19001+
.setGroupId(groupId)
19002+
.setMemberId(Uuid.randomUuid().toString())
19003+
.setMemberEpoch(1)));
19004+
}
19005+
19006+
@Test
19007+
public void testStreamsGroupDescribeOnShareGroup() {
19008+
String groupId = "group-foo";
19009+
String memberId = Uuid.randomUuid().toString();
19010+
19011+
int epoch = 10;
19012+
String topicName = "topicName";
19013+
ShareGroupMember.Builder memberBuilder = new ShareGroupMember.Builder(memberId)
19014+
.setSubscribedTopicNames(List.of(topicName));
19015+
19016+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
19017+
.withShareGroup(new ShareGroupBuilder(groupId, epoch)
19018+
.withMember(memberBuilder.build()))
19019+
.build();
19020+
19021+
List<StreamsGroupDescribeResponseData.DescribedGroup> expected = List.of(
19022+
new StreamsGroupDescribeResponseData.DescribedGroup()
19023+
.setGroupId(groupId)
19024+
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
19025+
.setErrorMessage("Group " + groupId + " is not a streams group.")
19026+
);
19027+
19028+
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(List.of(groupId));
19029+
assertEquals(expected, actual);
19030+
}
19031+
1868819032
@Test
1868919033
public void testReplayConsumerGroupRegularExpression() {
1869019034
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()

0 commit comments

Comments
 (0)