Skip to content

Commit 5148174

Browse files
KAFKA-16718-2/n: KafkaAdminClient and GroupCoordinator implementation for DeleteShareGroupOffsets RPC (#18976)
This PR contains the implementation of KafkaAdminClient and GroupCoordinator for DeleteShareGroupOffsets RPC. - Added `deleteShareGroupOffsets` to `KafkaAdminClient` - Added implementation for `handleDeleteShareGroupOffsetsRequest` in `KafkaApis.scala` - Added `deleteShareGroupOffsets` to `GroupCoordinator` as well. internally this makes use of `persister.deleteState` to persist the changes in share coordinator Reviewers: Andrew Schofield <[email protected]>, Sushant Mahajan <[email protected]>
1 parent 43e22ef commit 5148174

File tree

18 files changed

+1680
-24
lines changed

18 files changed

+1680
-24
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

+30-5
Original file line numberDiff line numberDiff line change
@@ -1947,13 +1947,28 @@ default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareG
19471947
}
19481948

19491949
/**
1950-
* Delete share groups from the cluster with the default options.
1950+
* Delete offsets for a set of partitions in a share group.
19511951
*
1952-
* @param groupIds Collection of share group ids which are to be deleted.
1953-
* @return The DeleteShareGroupsResult.
1952+
* @param groupId The group for which to delete offsets.
1953+
* @param partitions The topic-partitions.
1954+
* @param options The options to use when deleting offsets in a share group.
1955+
* @return The DeleteShareGroupOffsetsResult.
19541956
*/
1955-
default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
1956-
return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
1957+
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options);
1958+
1959+
/**
1960+
* Delete offsets for a set of partitions in a share group with the default options.
1961+
*
1962+
* <p>
1963+
* This is a convenience method for {@link #deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} with default options.
1964+
* See the overload for more details.
1965+
*
1966+
* @param groupId The group for which to delete offsets.
1967+
* @param partitions The topic-partitions.
1968+
* @return The DeleteShareGroupOffsetsResult.
1969+
*/
1970+
default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions) {
1971+
return deleteShareGroupOffsets(groupId, partitions, new DeleteShareGroupOffsetsOptions());
19571972
}
19581973

19591974
/**
@@ -1965,6 +1980,16 @@ default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
19651980
*/
19661981
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);
19671982

1983+
/**
1984+
* Delete share groups from the cluster with the default options.
1985+
*
1986+
* @param groupIds Collection of share group ids which are to be deleted.
1987+
* @return The DeleteShareGroupsResult.
1988+
*/
1989+
default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
1990+
return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
1991+
}
1992+
19681993
/**
19691994
* Describe streams groups in the cluster.
19701995
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.clients.admin;
19+
20+
import org.apache.kafka.common.annotation.InterfaceStability;
21+
22+
import java.util.Set;
23+
24+
/**
25+
* Options for the {@link Admin#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call.
26+
* <p>
27+
* The API of this class is evolving, see {@link Admin} for details.
28+
*/
29+
@InterfaceStability.Evolving
30+
public class DeleteShareGroupOffsetsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions> {
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import org.apache.kafka.common.KafkaFuture;
20+
import org.apache.kafka.common.TopicPartition;
21+
import org.apache.kafka.common.annotation.InterfaceStability;
22+
import org.apache.kafka.common.errors.ApiException;
23+
import org.apache.kafka.common.internals.KafkaFutureImpl;
24+
25+
import java.util.Map;
26+
import java.util.Set;
27+
28+
/**
29+
* The result of the {@link Admin#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call.
30+
* <p>
31+
* The API of this class is evolving, see {@link Admin} for details.
32+
*/
33+
@InterfaceStability.Evolving
34+
public class DeleteShareGroupOffsetsResult {
35+
36+
private final KafkaFuture<Map<TopicPartition, ApiException>> future;
37+
private final Set<TopicPartition> partitions;
38+
39+
DeleteShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, ApiException>> future, Set<TopicPartition> partitions) {
40+
this.future = future;
41+
this.partitions = partitions;
42+
}
43+
44+
/**
45+
* Return a future which succeeds only if all the deletions succeed.
46+
* If not, the first partition error shall be returned.
47+
*/
48+
public KafkaFuture<Void> all() {
49+
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
50+
51+
this.future.whenComplete((topicPartitions, throwable) -> {
52+
if (throwable != null) {
53+
result.completeExceptionally(throwable);
54+
} else {
55+
for (TopicPartition partition : partitions) {
56+
if (maybeCompleteExceptionally(topicPartitions, partition, result)) {
57+
return;
58+
}
59+
}
60+
result.complete(null);
61+
}
62+
});
63+
return result;
64+
}
65+
66+
/**
67+
* Return a future which can be used to check the result for a given partition.
68+
*/
69+
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
70+
if (!partitions.contains(partition)) {
71+
throw new IllegalArgumentException("Partition " + partition + " was not included in the original request");
72+
}
73+
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
74+
75+
this.future.whenComplete((topicPartitions, throwable) -> {
76+
if (throwable != null) {
77+
result.completeExceptionally(throwable);
78+
} else if (!maybeCompleteExceptionally(topicPartitions, partition, result)) {
79+
result.complete(null);
80+
}
81+
});
82+
return result;
83+
}
84+
85+
private boolean maybeCompleteExceptionally(Map<TopicPartition, ApiException> partitionLevelErrors,
86+
TopicPartition partition,
87+
KafkaFutureImpl<Void> result) {
88+
Throwable exception;
89+
if (!partitionLevelErrors.containsKey(partition)) {
90+
exception = new IllegalArgumentException("Offset deletion result for partition \"" + partition + "\" was not included in the response");
91+
} else {
92+
exception = partitionLevelErrors.get(partition);
93+
}
94+
95+
if (exception != null) {
96+
result.completeExceptionally(exception);
97+
return true;
98+
} else {
99+
return false;
100+
}
101+
}
102+
}

clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java

+5
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,11 @@ public ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGr
333333
return delegate.listShareGroupOffsets(groupSpecs, options);
334334
}
335335

336+
@Override
337+
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
338+
return delegate.deleteShareGroupOffsets(groupId, partitions, options);
339+
}
340+
336341
@Override
337342
public DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options) {
338343
return delegate.deleteShareGroups(groupIds, options);

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
5151
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
5252
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
53+
import org.apache.kafka.clients.admin.internals.DeleteShareGroupOffsetsHandler;
5354
import org.apache.kafka.clients.admin.internals.DeleteShareGroupsHandler;
5455
import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
5556
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
@@ -3841,6 +3842,14 @@ public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListS
38413842
return new ListShareGroupOffsetsResult(future.all());
38423843
}
38433844

3845+
@Override
3846+
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
3847+
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, ApiException>> future = DeleteShareGroupOffsetsHandler.newFuture(groupId);
3848+
DeleteShareGroupOffsetsHandler handler = new DeleteShareGroupOffsetsHandler(groupId, partitions, logContext);
3849+
invokeDriver(handler, future, options.timeoutMs);
3850+
return new DeleteShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), partitions);
3851+
}
3852+
38443853
@Override
38453854
public DescribeStreamsGroupsResult describeStreamsGroups(final Collection<String> groupIds,
38463855
final DescribeStreamsGroupsOptions options) {
@@ -3851,7 +3860,7 @@ public DescribeStreamsGroupsResult describeStreamsGroups(final Collection<String
38513860
return new DescribeStreamsGroupsResult(future.all().entrySet().stream()
38523861
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
38533862
}
3854-
3863+
38553864
@Override
38563865
public DescribeClassicGroupsResult describeClassicGroups(final Collection<String> groupIds,
38573866
final DescribeClassicGroupsOptions options) {

0 commit comments

Comments
 (0)