|
138 | 138 | import java.nio.ByteBuffer;
|
139 | 139 | import java.util.ArrayList;
|
140 | 140 | import java.util.Arrays;
|
| 141 | +import java.util.Collections; |
141 | 142 | import java.util.HashMap;
|
142 | 143 | import java.util.LinkedHashMap;
|
143 | 144 | import java.util.List;
|
@@ -18374,6 +18375,349 @@ public void testShareGroupDescribeOnConsumerGroup() {
|
18374 | 18375 | assertEquals(expected, actual);
|
18375 | 18376 | }
|
18376 | 18377 |
|
| 18378 | + @Test |
| 18379 | + public void testConsumerGroupHeartbeatOnStreamsGroup() { |
| 18380 | + String groupId = "group-foo"; |
| 18381 | + String memberId = Uuid.randomUuid().toString(); |
| 18382 | + |
| 18383 | + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() |
| 18384 | + .withMetadataImage(MetadataImage.EMPTY) |
| 18385 | + .withStreamsGroup(new StreamsGroupBuilder(groupId, 1) |
| 18386 | + .withMember(StreamsGroupMember.Builder.withDefaults(memberId) |
| 18387 | + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) |
| 18388 | + .setMemberEpoch(1) |
| 18389 | + .setPreviousMemberEpoch(0) |
| 18390 | + .setClientId(DEFAULT_CLIENT_ID) |
| 18391 | + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) |
| 18392 | + .build()) |
| 18393 | + .withTargetAssignment(memberId, TasksTuple.EMPTY) |
| 18394 | + .withTargetAssignmentEpoch(1)) |
| 18395 | + .build(); |
| 18396 | + |
| 18397 | + assertThrows(GroupIdNotFoundException.class, () -> context.consumerGroupHeartbeat( |
| 18398 | + new ConsumerGroupHeartbeatRequestData() |
| 18399 | + .setMemberId(memberId) |
| 18400 | + .setGroupId(groupId) |
| 18401 | + .setMemberEpoch(0) |
| 18402 | + .setServerAssignor("range") |
| 18403 | + .setRebalanceTimeoutMs(5000) |
| 18404 | + .setSubscribedTopicNames(List.of("foo", "bar")) |
| 18405 | + .setTopicPartitions(Collections.emptyList()))); |
| 18406 | + } |
| 18407 | + |
| 18408 | + @Test |
| 18409 | + public void testShareGroupHeartbeatOnStreamsGroup() { |
| 18410 | + String groupId = "group-foo"; |
| 18411 | + String memberId = Uuid.randomUuid().toString(); |
| 18412 | + |
| 18413 | + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() |
| 18414 | + .withMetadataImage(MetadataImage.EMPTY) |
| 18415 | + .withStreamsGroup(new StreamsGroupBuilder(groupId, 1) |
| 18416 | + .withMember(StreamsGroupMember.Builder.withDefaults(memberId) |
| 18417 | + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) |
| 18418 | + .setMemberEpoch(1) |
| 18419 | + .setPreviousMemberEpoch(0) |
| 18420 | + .setClientId(DEFAULT_CLIENT_ID) |
| 18421 | + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) |
| 18422 | + .build()) |
| 18423 | + .withTargetAssignment(memberId, TasksTuple.EMPTY) |
| 18424 | + .withTargetAssignmentEpoch(1)) |
| 18425 | + .build(); |
| 18426 | + |
| 18427 | + assertThrows(GroupIdNotFoundException.class, () -> context.shareGroupHeartbeat( |
| 18428 | + new ShareGroupHeartbeatRequestData() |
| 18429 | + .setMemberId(memberId) |
| 18430 | + .setGroupId(groupId) |
| 18431 | + .setMemberEpoch(0) |
| 18432 | + .setSubscribedTopicNames(List.of("foo", "bar")))); |
| 18433 | + } |
| 18434 | + |
| 18435 | + @Test |
| 18436 | + public void testClassicGroupJoinOnStreamsGroup() throws Exception { |
| 18437 | + String groupId = "group-foo"; |
| 18438 | + String memberId = Uuid.randomUuid().toString(); |
| 18439 | + |
| 18440 | + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() |
| 18441 | + .withMetadataImage(MetadataImage.EMPTY) |
| 18442 | + .withStreamsGroup(new StreamsGroupBuilder(groupId, 1) |
| 18443 | + .withMember(StreamsGroupMember.Builder.withDefaults(memberId) |
| 18444 | + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) |
| 18445 | + .setMemberEpoch(1) |
| 18446 | + .setPreviousMemberEpoch(0) |
| 18447 | + .setClientId(DEFAULT_CLIENT_ID) |
| 18448 | + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) |
| 18449 | + .build()) |
| 18450 | + .withTargetAssignment(memberId, TasksTuple.EMPTY) |
| 18451 | + .withTargetAssignmentEpoch(1)) |
| 18452 | + .build(); |
| 18453 | + |
| 18454 | + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() |
| 18455 | + .withGroupId(groupId) |
| 18456 | + .withMemberId(UNKNOWN_MEMBER_ID) |
| 18457 | + .withProtocolType("consumer") |
| 18458 | + .withProtocols(new JoinGroupRequestProtocolCollection(0)) |
| 18459 | + .build(); |
| 18460 | + |
| 18461 | + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); |
| 18462 | + assertTrue(joinResult.joinFuture.isDone()); |
| 18463 | + assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), joinResult.joinFuture.get().errorCode()); |
| 18464 | + } |
| 18465 | + |
| 18466 | + @Test |
| 18467 | + public void testClassicGroupSyncOnStreamsGroup() throws Exception { |
| 18468 | + String groupId = "group-foo"; |
| 18469 | + String memberId = Uuid.randomUuid().toString(); |
| 18470 | + |
| 18471 | + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() |
| 18472 | + .withMetadataImage(MetadataImage.EMPTY) |
| 18473 | + .withStreamsGroup(new StreamsGroupBuilder(groupId, 1) |
| 18474 | + .withMember(StreamsGroupMember.Builder.withDefaults(memberId) |
| 18475 | + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) |
| 18476 | + .setMemberEpoch(1) |
| 18477 | + .setPreviousMemberEpoch(0) |
| 18478 | + .setClientId(DEFAULT_CLIENT_ID) |
| 18479 | + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) |
| 18480 | + .build()) |
| 18481 | + .withTargetAssignment(memberId, TasksTuple.EMPTY) |
| 18482 | + .withTargetAssignmentEpoch(1)) |
| 18483 | + .build(); |
| 18484 | + |
| 18485 | + SyncGroupRequestData request = new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() |
| 18486 | + .withGroupId(groupId) |
| 18487 | + .withGenerationId(1) |
| 18488 | + .withMemberId(memberId) |
| 18489 | + .build(); |
| 18490 | + |
| 18491 | + GroupMetadataManagerTestContext.SyncResult syncResult = context.sendClassicGroupSync(request); |
| 18492 | + |
| 18493 | + assertTrue(syncResult.records.isEmpty()); |
| 18494 | + assertTrue(syncResult.syncFuture.isDone()); |
| 18495 | + assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), syncResult.syncFuture.get().errorCode()); |
| 18496 | + } |
| 18497 | + |
| 18498 | + @Test |
| 18499 | + public void testClassicGroupLeaveOnStreamsGroup() throws Exception { |
| 18500 | + String groupId = "group-foo"; |
| 18501 | + String memberId = Uuid.randomUuid().toString(); |
| 18502 | + |
| 18503 | + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() |
| 18504 | + .withMetadataImage(MetadataImage.EMPTY) |
| 18505 | + .withStreamsGroup(new StreamsGroupBuilder(groupId, 1) |
| 18506 | + .withMember(StreamsGroupMember.Builder.withDefaults(memberId) |
| 18507 | + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) |
| 18508 | + .setMemberEpoch(1) |
| 18509 | + .setPreviousMemberEpoch(0) |
| 18510 | + .setClientId(DEFAULT_CLIENT_ID) |
| 18511 | + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) |
| 18512 | + .build()) |
| 18513 | + .withTargetAssignment(memberId, TasksTuple.EMPTY) |
| 18514 | + .withTargetAssignmentEpoch(1)) |
| 18515 | + .build(); |
| 18516 | + |
| 18517 | + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupLeave( |
| 18518 | + new LeaveGroupRequestData() |
| 18519 | + .setGroupId(groupId) |
| 18520 | + .setMembers(List.of( |
| 18521 | + new MemberIdentity() |
| 18522 | + .setMemberId(memberId))))); |
| 18523 | + } |
| 18524 | + |
| 18525 | + @Test |
| 18526 | + public void testConsumerGroupDescribeOnStreamsGroup() { |
| 18527 | + String groupId = "group-foo"; |
| 18528 | + String memberId = Uuid.randomUuid().toString(); |
| 18529 | + |
| 18530 | + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() |
| 18531 | + .withMetadataImage(MetadataImage.EMPTY) |
| 18532 | + .withStreamsGroup(new StreamsGroupBuilder(groupId, 1) |
| 18533 | + .withMember(StreamsGroupMember.Builder.withDefaults(memberId) |
| 18534 | + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) |
| 18535 | + .setMemberEpoch(1) |
| 18536 | + .setPreviousMemberEpoch(0) |
| 18537 | + .setClientId(DEFAULT_CLIENT_ID) |
| 18538 | + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) |
| 18539 | + .build()) |
| 18540 | + .withTargetAssignment(memberId, TasksTuple.EMPTY) |
| 18541 | + .withTargetAssignmentEpoch(1)) |
| 18542 | + .build(); |
| 18543 | + |
| 18544 | + List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = List.of( |
| 18545 | + new ConsumerGroupDescribeResponseData.DescribedGroup() |
| 18546 | + .setGroupId(groupId) |
| 18547 | + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) |
| 18548 | + .setErrorMessage("Group " + groupId + " is not a consumer group.") |
| 18549 | + ); |
| 18550 | + |
| 18551 | + List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(List.of(groupId)); |
| 18552 | + assertEquals(expected, actual); |
| 18553 | + } |
| 18554 | + |
| 18555 | + @Test |
| 18556 | + public void testShareGroupDescribeOnStreamsGroup() { |
| 18557 | + String groupId = "group-foo"; |
| 18558 | + String memberId = Uuid.randomUuid().toString(); |
| 18559 | + |
| 18560 | + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() |
| 18561 | + .withMetadataImage(MetadataImage.EMPTY) |
| 18562 | + .withStreamsGroup(new StreamsGroupBuilder(groupId, 1) |
| 18563 | + .withMember(StreamsGroupMember.Builder.withDefaults(memberId) |
| 18564 | + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) |
| 18565 | + .setMemberEpoch(1) |
| 18566 | + .setPreviousMemberEpoch(0) |
| 18567 | + .setClientId(DEFAULT_CLIENT_ID) |
| 18568 | + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) |
| 18569 | + .build()) |
| 18570 | + .withTargetAssignment(memberId, TasksTuple.EMPTY) |
| 18571 | + .withTargetAssignmentEpoch(1)) |
| 18572 | + .build(); |
| 18573 | + |
| 18574 | + List<ShareGroupDescribeResponseData.DescribedGroup> expected = List.of( |
| 18575 | + new ShareGroupDescribeResponseData.DescribedGroup() |
| 18576 | + .setGroupId(groupId) |
| 18577 | + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) |
| 18578 | + .setErrorMessage("Group " + groupId + " is not a share group.") |
| 18579 | + ); |
| 18580 | + |
| 18581 | + List<ShareGroupDescribeResponseData.DescribedGroup> actual = context.sendShareGroupDescribe(List.of(groupId)); |
| 18582 | + assertEquals(expected, actual); |
| 18583 | + } |
| 18584 | + |
| 18585 | + @Test |
| 18586 | + public void testStreamsGroupHeartbeatOnConsumerGroup() { |
| 18587 | + String groupId = "group-foo"; |
| 18588 | + // Use a static member id as it makes the test easier. |
| 18589 | + String memberId1 = Uuid.randomUuid().toString(); |
| 18590 | + |
| 18591 | + Uuid fooTopicId = Uuid.randomUuid(); |
| 18592 | + String fooTopicName = "foo"; |
| 18593 | + |
| 18594 | + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); |
| 18595 | + |
| 18596 | + // Consumer group with one static member. |
| 18597 | + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() |
| 18598 | + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) |
| 18599 | + .withMetadataImage(new MetadataImageBuilder() |
| 18600 | + .addTopic(fooTopicId, fooTopicName, 6) |
| 18601 | + .build()) |
| 18602 | + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) |
| 18603 | + .withMember(new ConsumerGroupMember.Builder(memberId1) |
| 18604 | + .setState(MemberState.STABLE) |
| 18605 | + .setInstanceId(memberId1) |
| 18606 | + .setMemberEpoch(10) |
| 18607 | + .setPreviousMemberEpoch(9) |
| 18608 | + .setClientId(DEFAULT_CLIENT_ID) |
| 18609 | + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) |
| 18610 | + .setSubscribedTopicNames(List.of("foo", "bar")) |
| 18611 | + .setServerAssignorName("range") |
| 18612 | + .setAssignedPartitions(mkAssignment( |
| 18613 | + mkTopicAssignment(fooTopicId, 0, 1, 2))) |
| 18614 | + .build()) |
| 18615 | + .withAssignment(memberId1, mkAssignment( |
| 18616 | + mkTopicAssignment(fooTopicId, 0, 1, 2))) |
| 18617 | + .withAssignmentEpoch(10)) |
| 18618 | + .build(); |
| 18619 | + |
| 18620 | + assertThrows(GroupIdNotFoundException.class, () -> |
| 18621 | + context.streamsGroupHeartbeat( |
| 18622 | + new StreamsGroupHeartbeatRequestData() |
| 18623 | + .setGroupId(groupId) |
| 18624 | + .setMemberId(Uuid.randomUuid().toString()) |
| 18625 | + .setMemberEpoch(1))); |
| 18626 | + } |
| 18627 | + |
| 18628 | + @Test |
| 18629 | + public void testStreamsGroupDescribeOnConsumerGroup() { |
| 18630 | + String groupId = "group-foo"; |
| 18631 | + String memberId = Uuid.randomUuid().toString(); |
| 18632 | + |
| 18633 | + int epoch = 10; |
| 18634 | + String topicName = "topicName"; |
| 18635 | + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId) |
| 18636 | + .setSubscribedTopicNames(List.of(topicName)) |
| 18637 | + .setServerAssignorName("assignorName"); |
| 18638 | + |
| 18639 | + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); |
| 18640 | + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() |
| 18641 | + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) |
| 18642 | + .withConsumerGroup(new ConsumerGroupBuilder(groupId, epoch) |
| 18643 | + .withMember(memberBuilder.build())) |
| 18644 | + .build(); |
| 18645 | + |
| 18646 | + List<StreamsGroupDescribeResponseData.DescribedGroup> expected = List.of( |
| 18647 | + new StreamsGroupDescribeResponseData.DescribedGroup() |
| 18648 | + .setGroupId(groupId) |
| 18649 | + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) |
| 18650 | + .setErrorMessage("Group " + groupId + " is not a streams group.") |
| 18651 | + ); |
| 18652 | + |
| 18653 | + List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(List.of(groupId)); |
| 18654 | + assertEquals(expected, actual); |
| 18655 | + } |
| 18656 | + |
| 18657 | + |
| 18658 | + @Test |
| 18659 | + public void testStreamsGroupHeartbeatOnShareGroup() { |
| 18660 | + String groupId = "group-foo"; |
| 18661 | + String memberId1 = Uuid.randomUuid().toString(); |
| 18662 | + |
| 18663 | + Uuid fooTopicId = Uuid.randomUuid(); |
| 18664 | + String fooTopicName = "foo"; |
| 18665 | + |
| 18666 | + // Share group with one member. |
| 18667 | + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() |
| 18668 | + .withMetadataImage(new MetadataImageBuilder() |
| 18669 | + .addTopic(fooTopicId, fooTopicName, 6) |
| 18670 | + .build()) |
| 18671 | + .withShareGroup(new ShareGroupBuilder(groupId, 10) |
| 18672 | + .withMember(new ShareGroupMember.Builder(memberId1) |
| 18673 | + .setState(MemberState.STABLE) |
| 18674 | + .setMemberEpoch(10) |
| 18675 | + .setPreviousMemberEpoch(9) |
| 18676 | + .setClientId(DEFAULT_CLIENT_ID) |
| 18677 | + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) |
| 18678 | + .setSubscribedTopicNames(List.of("foo", "bar")) |
| 18679 | + .setAssignedPartitions(mkAssignment( |
| 18680 | + mkTopicAssignment(fooTopicId, 0, 1, 2))) |
| 18681 | + .build()) |
| 18682 | + .withAssignment(memberId1, mkAssignment( |
| 18683 | + mkTopicAssignment(fooTopicId, 0, 1, 2))) |
| 18684 | + .withAssignmentEpoch(10)) |
| 18685 | + .build(); |
| 18686 | + |
| 18687 | + assertThrows(GroupIdNotFoundException.class, () -> |
| 18688 | + context.streamsGroupHeartbeat( |
| 18689 | + new StreamsGroupHeartbeatRequestData() |
| 18690 | + .setGroupId(groupId) |
| 18691 | + .setMemberId(Uuid.randomUuid().toString()) |
| 18692 | + .setMemberEpoch(1))); |
| 18693 | + } |
| 18694 | + |
| 18695 | + @Test |
| 18696 | + public void testStreamsGroupDescribeOnShareGroup() { |
| 18697 | + String groupId = "group-foo"; |
| 18698 | + String memberId = Uuid.randomUuid().toString(); |
| 18699 | + |
| 18700 | + int epoch = 10; |
| 18701 | + String topicName = "topicName"; |
| 18702 | + ShareGroupMember.Builder memberBuilder = new ShareGroupMember.Builder(memberId) |
| 18703 | + .setSubscribedTopicNames(List.of(topicName)); |
| 18704 | + |
| 18705 | + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() |
| 18706 | + .withShareGroup(new ShareGroupBuilder(groupId, epoch) |
| 18707 | + .withMember(memberBuilder.build())) |
| 18708 | + .build(); |
| 18709 | + |
| 18710 | + List<StreamsGroupDescribeResponseData.DescribedGroup> expected = List.of( |
| 18711 | + new StreamsGroupDescribeResponseData.DescribedGroup() |
| 18712 | + .setGroupId(groupId) |
| 18713 | + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) |
| 18714 | + .setErrorMessage("Group " + groupId + " is not a streams group.") |
| 18715 | + ); |
| 18716 | + |
| 18717 | + List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(List.of(groupId)); |
| 18718 | + assertEquals(expected, actual); |
| 18719 | + } |
| 18720 | + |
18377 | 18721 | @Test
|
18378 | 18722 | public void testReplayConsumerGroupRegularExpression() {
|
18379 | 18723 | GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
|
0 commit comments