Skip to content

Commit d4d9f11

Browse files
KAFKA-18761: [2/N] List share group offsets with state and auth (#19328)
This PR approaches completion of Admin.listShareGroupOffsets() and kafka-share-groups.sh --describe --offsets. Prior to this patch, kafka-share-groups.sh was only able to describe the offsets for partitions which were assigned to active members. Now, the Admin.listShareGroupOffsets() uses the persister's knowledge of the share-partitions which have initialised state. Then, it uses this list to obtain a complete set of offset information. The PR also implements the topic-based authorisation checking. If Admin.listShareGroupOffsets() is called with a list of topic-partitions specified, the authz checking is performed on the supplied list, returning errors for any topics to which the client is not authorised. If Admin.listShareGroupOffsets() is called without a list of topic-partitions specified, the list of topics is discovered from the persister as described above, and then the response is filtered down to only show the topics to which the client is authorised. This is consistent with other similar RPCs in the Kafka protocol, such as OffsetFetch. Reviewers: David Arthur <[email protected]>, Sushant Mahajan <[email protected]>, Apoorv Mittal <[email protected]>
1 parent 98c0f30 commit d4d9f11

File tree

13 files changed

+915
-77
lines changed

13 files changed

+915
-77
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.kafka.common.annotation.InterfaceStability;
2222

2323
import java.util.Collection;
24-
import java.util.List;
2524
import java.util.Map;
2625
import java.util.Objects;
2726

@@ -47,9 +46,10 @@ public ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topi
4746

4847
/**
4948
* Returns the topic partitions whose offsets are to be listed for a share group.
49+
* {@code null} indicates that offsets of all partitions of the group are to be listed.
5050
*/
5151
public Collection<TopicPartition> topicPartitions() {
52-
return topicPartitions == null ? List.of() : topicPartitions;
52+
return topicPartitions;
5353
}
5454

5555
@Override

clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java

+16-13
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.util.ArrayList;
3838
import java.util.Collection;
3939
import java.util.HashMap;
40-
import java.util.LinkedList;
4140
import java.util.List;
4241
import java.util.Map;
4342
import java.util.Set;
@@ -84,19 +83,23 @@ public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordina
8483
DescribeShareGroupOffsetsRequestGroup requestGroup = new DescribeShareGroupOffsetsRequestGroup()
8584
.setGroupId(groupId);
8685

87-
Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
88-
spec.topicPartitions().forEach(tp -> topicPartitionMap.computeIfAbsent(tp.topic(), t -> new LinkedList<>()).add(tp.partition()));
89-
90-
Map<String, DescribeShareGroupOffsetsRequestTopic> requestTopics = new HashMap<>();
91-
for (TopicPartition tp : spec.topicPartitions()) {
92-
requestTopics.computeIfAbsent(tp.topic(), t ->
93-
new DescribeShareGroupOffsetsRequestTopic()
94-
.setTopicName(tp.topic())
95-
.setPartitions(new LinkedList<>()))
96-
.partitions()
97-
.add(tp.partition());
86+
if (spec.topicPartitions() != null) {
87+
Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
88+
spec.topicPartitions().forEach(tp -> topicPartitionMap.computeIfAbsent(tp.topic(), t -> new ArrayList<>()).add(tp.partition()));
89+
90+
Map<String, DescribeShareGroupOffsetsRequestTopic> requestTopics = new HashMap<>();
91+
for (TopicPartition tp : spec.topicPartitions()) {
92+
requestTopics.computeIfAbsent(tp.topic(), t ->
93+
new DescribeShareGroupOffsetsRequestTopic()
94+
.setTopicName(tp.topic())
95+
.setPartitions(new ArrayList<>()))
96+
.partitions()
97+
.add(tp.partition());
98+
}
99+
requestGroup.setTopics(new ArrayList<>(requestTopics.values()));
100+
} else {
101+
requestGroup.setTopics(null);
98102
}
99-
requestGroup.setTopics(new ArrayList<>(requestTopics.values()));
100103
groups.add(requestGroup);
101104
});
102105
DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData()

clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
"about": "The groups to describe offsets for.", "fields": [
2727
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
2828
"about": "The group identifier." },
29-
{ "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+",
30-
"about": "The topics to describe offsets for.", "fields": [
29+
{ "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+", "nullableVersions": "0+",
30+
"about": "The topics to describe offsets for, or null for all topic-partitions.", "fields": [
3131
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
3232
"about": "The topic name." },
3333
{ "name": "Partitions", "type": "[]int32", "versions": "0+",

clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -10559,9 +10559,7 @@ public void testListShareGroupOffsets() throws Exception {
1055910559
TopicPartition myTopicPartition4 = new TopicPartition("my_topic_1", 4);
1056010560
TopicPartition myTopicPartition5 = new TopicPartition("my_topic_2", 6);
1056110561

10562-
ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec().topicPartitions(
10563-
List.of(myTopicPartition0, myTopicPartition1, myTopicPartition2, myTopicPartition3, myTopicPartition4, myTopicPartition5)
10564-
);
10562+
ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec();
1056510563
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>();
1056610564
groupSpecs.put(GROUP_ID, groupSpec);
1056710565

core/src/main/scala/kafka/server/KafkaApis.scala

+66-3
Original file line numberDiff line numberDiff line change
@@ -3508,6 +3508,7 @@ class KafkaApis(val requestChannel: RequestChannel,
35083508

35093509
val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]](groups.size)
35103510
groups.forEach { groupDescribeOffsets =>
3511+
val isAllPartitions = groupDescribeOffsets.topics == null
35113512
if (!isShareGroupProtocolEnabled) {
35123513
futures += CompletableFuture.completedFuture(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
35133514
.setGroupId(groupDescribeOffsets.groupId)
@@ -3516,6 +3517,11 @@ class KafkaApis(val requestChannel: RequestChannel,
35163517
futures += CompletableFuture.completedFuture(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
35173518
.setGroupId(groupDescribeOffsets.groupId)
35183519
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
3520+
} else if (isAllPartitions) {
3521+
futures += describeShareGroupAllOffsetsForGroup(
3522+
request.context,
3523+
groupDescribeOffsets
3524+
)
35193525
} else if (groupDescribeOffsets.topics.isEmpty) {
35203526
futures += CompletableFuture.completedFuture(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
35213527
.setGroupId(groupDescribeOffsets.groupId))
@@ -3535,19 +3541,76 @@ class KafkaApis(val requestChannel: RequestChannel,
35353541
}
35363542
}
35373543

3538-
private def describeShareGroupOffsetsForGroup(requestContext: RequestContext,
3544+
private def describeShareGroupAllOffsetsForGroup(requestContext: RequestContext,
35393545
groupDescribeOffsetsRequest: DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
35403546
): CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] = {
3541-
groupCoordinator.describeShareGroupOffsets(
3547+
groupCoordinator.describeShareGroupAllOffsets(
35423548
requestContext,
35433549
groupDescribeOffsetsRequest
35443550
).handle[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] { (groupDescribeOffsetsResponse, exception) =>
35453551
if (exception != null) {
3552+
val error = Errors.forException(exception)
35463553
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
35473554
.setGroupId(groupDescribeOffsetsRequest.groupId)
3548-
.setErrorCode(Errors.forException(exception).code)
3555+
.setErrorCode(error.code)
3556+
.setErrorMessage(error.message)
35493557
} else {
3558+
// Clients are not allowed to see offsets for topics that are not authorized for Describe.
3559+
val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized(
3560+
requestContext,
3561+
DESCRIBE,
3562+
TOPIC,
3563+
groupDescribeOffsetsResponse.topics.asScala
3564+
)(_.topicName)
3565+
groupDescribeOffsetsResponse.setTopics(authorizedOffsets.asJava)
3566+
}
3567+
}
3568+
}
3569+
3570+
private def describeShareGroupOffsetsForGroup(requestContext: RequestContext,
3571+
groupDescribeOffsetsRequest: DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
3572+
): CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] = {
3573+
// Clients are not allowed to see offsets for topics that are not authorized for Describe.
3574+
val (authorizedTopics, unauthorizedTopics) = authHelper.partitionSeqByAuthorized(
3575+
requestContext,
3576+
DESCRIBE,
3577+
TOPIC,
3578+
groupDescribeOffsetsRequest.topics.asScala
3579+
)(_.topicName)
3580+
3581+
groupCoordinator.describeShareGroupOffsets(
3582+
requestContext,
3583+
new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
3584+
.setGroupId(groupDescribeOffsetsRequest.groupId)
3585+
.setTopics(authorizedTopics.asJava)
3586+
).handle[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup] { (groupDescribeOffsetsResponse, exception) =>
3587+
if (exception != null) {
3588+
val error = Errors.forException(exception)
3589+
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
3590+
.setGroupId(groupDescribeOffsetsRequest.groupId)
3591+
.setErrorCode(error.code)
3592+
.setErrorMessage(error.message)
3593+
} else if (groupDescribeOffsetsResponse.errorCode() != Errors.NONE.code) {
35503594
groupDescribeOffsetsResponse
3595+
} else {
3596+
val topics = new util.ArrayList[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic](
3597+
groupDescribeOffsetsResponse.topics.size + unauthorizedTopics.size
3598+
)
3599+
topics.addAll(groupDescribeOffsetsResponse.topics)
3600+
unauthorizedTopics.foreach { topic =>
3601+
val topicResponse = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
3602+
.setTopicName(topic.topicName)
3603+
.setTopicId(Uuid.ZERO_UUID)
3604+
topic.partitions().forEach { partitionIndex =>
3605+
topicResponse.partitions.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
3606+
.setPartitionIndex(partitionIndex)
3607+
.setStartOffset(-1)
3608+
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
3609+
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
3610+
}
3611+
topics.add(topicResponse)
3612+
}
3613+
groupDescribeOffsetsResponse.setTopics(topics)
35513614
}
35523615
}
35533616
}

0 commit comments

Comments
 (0)