-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-18613: Add StreamsGroupHeartbeat handler in the group coordinator #19114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...oordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
Basic streams group heartbeat handling. - No support for static membership - No support for configurations (using constants instead) - No support for regular expressions
b442946
to
fccd6e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @lucasbru !
I did a first pass over the production code. I haven't looked at GroupMetadataTest
, yet.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java
Show resolved
Hide resolved
@cadonna Thanks for the comments! Ready for re-review |
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @lucasbru !
I reviewed half of the tests in GroupMetadataTest
.
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cadonna Thanks for the comments! Ready for re-review.
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @lucasbru, I a few comments, but otherwise this lgtm
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
@bbejeck Thanks for the comments! Ready for re-review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, @lucasbru !
I reviewed also the second part of the tests.
Once you resolved to my comments, the PR is ready for merge.
.withTargetAssignment(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, | ||
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) | ||
.withTargetAssignmentEpoch(10) | ||
.withTopology(StreamsTopology.fromHeartbeatRequest(topology)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test would be easier to understand if you added the following here:
.withPartitionMetadata(Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 3),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
))
or even add a variable changedPartitionCount = 3
and then:
.withPartitionMetadata(Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, changedPartitionCount),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
))
It took me quite some time to understand in the partition metadata was from no metadata to some metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. It makes sense, but I changed the number of partitions in MetadataImage
instead, as that is where a change would come from. I also changed the number of partitions of bar
instead of foo
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, sure. My bad! I meant it as you did.
} | ||
|
||
@Test | ||
public void testStreamsLeavingMemberBumpsGroupEpoch() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a better name would be testStreamsLeavingMemberRemovesMemberAndBumpsGroupEpoch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), | ||
TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) | ||
.withTargetAssignmentEpoch(10)) | ||
.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, you should also add the partition metadata to the group. The test relies on the order of checking first the member changes and then partition metadata changes. In the case the production code changes that order this test would fail in this specific location although it should not. That is better than the other way around, i.e., the test does not fail although it should. Nevertheless, making the test more robust to production code changes saves some debugging pain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
// Member 1 joins the streams group. The request fails because the | ||
// target assignment computation failed. | ||
assertThrows(UnknownServerException.class, () -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please also verify the exception message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
context.rollback(); | ||
|
||
// However, the next heartbeat should detect the divergence based on the epoch and trigger | ||
// a metadata refr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
// a metadata refr | |
// a metadata refresh. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
); | ||
|
||
// Advance time past the revocation timeout. | ||
List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = context.sleep(10000 + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please use a variable revocationTimeout
instead of 10000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @lucasbru LGTM modulo addressing Bruno's oustanding comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cadonna I addressed your comments
); | ||
|
||
// Advance time past the revocation timeout. | ||
List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = context.sleep(10000 + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
context.rollback(); | ||
|
||
// However, the next heartbeat should detect the divergence based on the epoch and trigger | ||
// a metadata refr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
// Member 1 joins the streams group. The request fails because the | ||
// target assignment computation failed. | ||
assertThrows(UnknownServerException.class, () -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), | ||
TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) | ||
.withTargetAssignmentEpoch(10)) | ||
.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
@Test | ||
public void testStreamsLeavingMemberBumpsGroupEpoch() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.withTargetAssignment(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, | ||
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) | ||
.withTargetAssignmentEpoch(10) | ||
.withTopology(StreamsTopology.fromHeartbeatRequest(topology)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. It makes sense, but I changed the number of partitions in MetadataImage
instead, as that is where a change would come from. I also changed the number of partitions of bar
instead of foo
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lucasbru for the updates!
LGTM!
…or (apache#19114) Basic streams group heartbeat handling. The main part of are the unit tests that make sure that we behave, for the most part, like a consumer group. - No support for static membership - No support for configurations (using constants instead) - No support for regular expressions Reviewers: Bill Bejeck <[email protected]>, Bruno Cadonna <[email protected]>
Basic streams group heartbeat handling. The main part of are the unit tests that make sure that we behave, for the most part, like a consumer group.
Reviewers: Bill Bejeck [email protected], Bruno Cadonna [email protected]