-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-18761: [2/N] List share group offsets with state and auth #19328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, LGTM.
if (exception != null) { | ||
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup() | ||
.setGroupId(groupDescribeOffsetsRequest.groupId) | ||
.setErrorCode(Errors.forException(exception).code) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we create a val Error and use that here
} | ||
}); | ||
return readShareGroupStateSummary(readSummaryRequestData, requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets add an exceptionally block as well and corresponding unit test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already exception handling inside readShareGroupStateSummary
and there is unit testing of it (testDescribeShareGroupAllOffsetsThrowsError
for example).
Thanks @AndrewJSchofield for the PR. |
groupDescribeOffsetsRequest: DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup | ||
): CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] = { | ||
groupCoordinator.describeShareGroupOffsets( | ||
groupCoordinator.describeShareGroupAllOffsets( | ||
requestContext, | ||
groupDescribeOffsetsRequest | ||
).handle[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] { (groupDescribeOffsetsResponse, exception) => | ||
if (exception != null) { | ||
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup() | ||
.setGroupId(groupDescribeOffsetsRequest.groupId) | ||
.setErrorCode(Errors.forException(exception).code) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we declare a new val instead of Errors lookup twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just two non-blocking questions
@@ -47,9 +46,10 @@ public ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topi | |||
|
|||
/** | |||
* Returns the topic partitions whose offsets are to be listed for a share group. | |||
* {@code null} indicates that offsets of all partitions of the group are to be listed. | |||
*/ | |||
public Collection<TopicPartition> topicPartitions() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we do elsewhere in the admin for "all" vs "none"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, but this is precisely the same behaviour as ListConsumerGroupOffsetsSpec
. Essentially, if you do new ListShareGroupOffsetsSpec()
you get all of them. If you do new ListShareGroupOffsetsSpec().topicPartitions(....)
then you get a smaller list. There's no way to ask for none.
.add(tp.partition()); | ||
if (spec.topicPartitions() != null) { | ||
Map<String, List<Integer>> topicPartitionMap = new HashMap<>(); | ||
spec.topicPartitions().forEach(tp -> topicPartitionMap.computeIfAbsent(tp.topic(), t -> new LinkedList<>()).add(tp.partition())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize it's unrelated to this PR, but why are we using a LinkedList here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No particular reason I think. I've changed it to ArrayList but it's fairly arbitrary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, great patch
This PR approaches completion of Admin.listShareGroupOffsets() and kafka-share-groups.sh --describe --offsets.
Prior to this patch, kafka-share-groups.sh was only able to describe the offsets for partitions which were assigned to active members. Now, the Admin.listShareGroupOffsets() uses the persister's knowledge of the share-partitions which have initialised state. Then, it uses this list to obtain a complete set of offset information.
The PR also implements the topic-based authorisation checking. If Admin.listShareGroupOffsets() is called with a list of topic-partitions specified, the authz checking is performed on the supplied list, returning errors for any topics to which the client is not authorised. If Admin.listShareGroupOffsets() is called without a list of topic-partitions specified, the list of topics is discovered from the persister as described above, and then the response is filtered down to only show the topics to which the client is authorised. This is consistent with other similar RPCs in the Kafka protocol, such as OffsetFetch.