Skip to content

Commit 8d66481

Browse files
KAFKA-17897 Deprecate Admin.listConsumerGroups (#19477)
The final part of KIP-1043 is to deprecate Admin.listConsumerGroups() in favour of Admin.listGroups() which works for all group types. Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 6f5be29 commit 8d66481

File tree

17 files changed

+141
-88
lines changed

17 files changed

+141
-88
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

+4
Original file line numberDiff line numberDiff line change
@@ -879,20 +879,24 @@ default DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> g
879879

880880
/**
881881
* List the consumer groups available in the cluster.
882+
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
882883
*
883884
* @param options The options to use when listing the consumer groups.
884885
* @return The ListConsumerGroupsResult.
885886
*/
887+
@Deprecated(since = "4.1", forRemoval = true)
886888
ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);
887889

888890
/**
889891
* List the consumer groups available in the cluster with the default options.
890892
* <p>
891893
* This is a convenience method for {@link #listConsumerGroups(ListConsumerGroupsOptions)} with default options.
892894
* See the overload for more details.
895+
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
893896
*
894897
* @return The ListConsumerGroupsResult.
895898
*/
899+
@Deprecated(since = "4.1", forRemoval = true)
896900
default ListConsumerGroupsResult listConsumerGroups() {
897901
return listConsumerGroups(new ListConsumerGroupsOptions());
898902
}

clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java

+2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626

2727
/**
2828
* A listing of a consumer group in the cluster.
29+
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} and {@link GroupListing} instead.
2930
*/
31+
@Deprecated(since = "4.1")
3032
public class ConsumerGroupListing {
3133
private final String groupId;
3234
private final boolean isSimpleConsumerGroup;

clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java

+2
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> gr
159159
}
160160

161161
@Override
162+
@Deprecated
163+
@SuppressWarnings("removal")
162164
public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
163165
return delegate.listConsumerGroups(options);
164166
}

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

+3
Original file line numberDiff line numberDiff line change
@@ -3614,6 +3614,7 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri
36143614
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
36153615
}
36163616

3617+
@Deprecated
36173618
private static final class ListConsumerGroupsResults {
36183619
private final List<Throwable> errors;
36193620
private final HashMap<String, ConsumerGroupListing> listings;
@@ -3657,6 +3658,8 @@ private synchronized void tryComplete() {
36573658
}
36583659

36593660
@Override
3661+
@SuppressWarnings("removal")
3662+
@Deprecated(since = "4.1", forRemoval = true)
36603663
public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
36613664
final KafkaFutureImpl<Collection<Object>> all = new KafkaFutureImpl<>();
36623665
final long nowMetadata = time.milliseconds();

clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java

+3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828

2929
/**
3030
* Options for {@link Admin#listConsumerGroups()}.
31+
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
3132
*/
33+
@Deprecated(since = "4.1")
34+
@SuppressWarnings("removal")
3235
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
3336

3437
private Set<GroupState> groupStates = Collections.emptySet();

clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525

2626
/**
2727
* The result of the {@link Admin#listConsumerGroups()} call.
28-
* <p>
28+
* @deprecated Since 4.1. Use {@link Admin#listGroups(ListGroupsOptions)} instead.
2929
*/
30+
@Deprecated(since = "4.1")
31+
@SuppressWarnings("removal")
3032
public class ListConsumerGroupsResult {
3133
private final KafkaFutureImpl<Collection<ConsumerGroupListing>> all;
3234
private final KafkaFutureImpl<Collection<ConsumerGroupListing>> valid;

clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java

+27-4
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package org.apache.kafka.clients.admin;
1919

20+
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
2021
import org.apache.kafka.common.GroupState;
2122
import org.apache.kafka.common.GroupType;
2223
import org.apache.kafka.common.annotation.InterfaceStability;
2324

24-
import java.util.Collections;
2525
import java.util.Set;
2626

2727
/**
@@ -32,16 +32,32 @@
3232
@InterfaceStability.Evolving
3333
public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
3434

35-
private Set<GroupState> groupStates = Collections.emptySet();
36-
private Set<GroupType> types = Collections.emptySet();
35+
private Set<GroupState> groupStates = Set.of();
36+
private Set<GroupType> types = Set.of();
37+
private Set<String> protocolTypes = Set.of();
38+
39+
/**
40+
* Only consumer groups will be returned by listGroups().
41+
* This operation sets filters on group type and protocol type which select consumer groups.
42+
*/
43+
public static ListGroupsOptions forConsumerGroups() {
44+
return new ListGroupsOptions()
45+
.withTypes(Set.of(GroupType.CLASSIC, GroupType.CONSUMER))
46+
.withProtocolTypes(Set.of("", ConsumerProtocol.PROTOCOL_TYPE));
47+
}
3748

3849
/**
3950
* If groupStates is set, only groups in these states will be returned by listGroups().
4051
* Otherwise, all groups are returned.
4152
* This operation is supported by brokers with version 2.6.0 or later.
4253
*/
4354
public ListGroupsOptions inGroupStates(Set<GroupState> groupStates) {
44-
this.groupStates = (groupStates == null || groupStates.isEmpty()) ? Collections.emptySet() : Set.copyOf(groupStates);
55+
this.groupStates = (groupStates == null || groupStates.isEmpty()) ? Set.of() : Set.copyOf(groupStates);
56+
return this;
57+
}
58+
59+
public ListGroupsOptions withProtocolTypes(Set<String> protocolTypes) {
60+
this.protocolTypes = (protocolTypes == null || protocolTypes.isEmpty()) ? Set.of() : Set.copyOf(protocolTypes);
4561
return this;
4662
}
4763

@@ -61,6 +77,13 @@ public Set<GroupState> groupStates() {
6177
return groupStates;
6278
}
6379

80+
/**
81+
* Returns the list of protocol types that are requested or empty if no protocol types have been specified.
82+
*/
83+
public Set<String> protocolTypes() {
84+
return protocolTypes;
85+
}
86+
6487
/**
6588
* Returns the list of group types that are requested or empty if no types have been specified.
6689
*/

clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

+6
Original file line numberDiff line numberDiff line change
@@ -3268,6 +3268,7 @@ public void testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers
32683268
}
32693269

32703270
@Test
3271+
@SuppressWarnings("removal")
32713272
public void testListConsumerGroups() throws Exception {
32723273
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
32733274
AdminClientConfig.RETRIES_CONFIG, "2")) {
@@ -3377,6 +3378,7 @@ public void testListConsumerGroups() throws Exception {
33773378
}
33783379

33793380
@Test
3381+
@SuppressWarnings("removal")
33803382
public void testListConsumerGroupsMetadataFailure() throws Exception {
33813383
final Cluster cluster = mockCluster(3, 0);
33823384
final Time time = new MockTime();
@@ -3400,6 +3402,7 @@ public void testListConsumerGroupsMetadataFailure() throws Exception {
34003402
}
34013403

34023404
@Test
3405+
@SuppressWarnings("removal")
34033406
public void testListConsumerGroupsWithStates() throws Exception {
34043407
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
34053408
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@@ -3433,6 +3436,7 @@ public void testListConsumerGroupsWithStates() throws Exception {
34333436
}
34343437

34353438
@Test
3439+
@SuppressWarnings("removal")
34363440
public void testListConsumerGroupsWithTypes() throws Exception {
34373441
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
34383442
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@@ -3495,6 +3499,7 @@ public void testListConsumerGroupsWithTypes() throws Exception {
34953499
}
34963500

34973501
@Test
3502+
@SuppressWarnings("removal")
34983503
public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception {
34993504
ApiVersion listGroupV3 = new ApiVersion()
35003505
.setApiKey(ApiKeys.LIST_GROUPS.id)
@@ -3533,6 +3538,7 @@ public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exceptio
35333538
}
35343539

35353540
@Test
3541+
@SuppressWarnings("removal")
35363542
public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception {
35373543
ApiVersion listGroupV4 = new ApiVersion()
35383544
.setApiKey(ApiKeys.LIST_GROUPS.id)

clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

+1
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,7 @@ public synchronized DescribeConsumerGroupsResult describeConsumerGroups(Collecti
735735
}
736736

737737
@Override
738+
@SuppressWarnings("removal")
738739
public synchronized ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
739740
KafkaFutureImpl<Collection<Object>> future = new KafkaFutureImpl<>();
740741
future.complete(groupConfigs.keySet().stream().map(g -> new ConsumerGroupListing(g, false)).collect(Collectors.toList()));

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
package org.apache.kafka.connect.mirror;
1818

1919
import org.apache.kafka.clients.admin.Admin;
20-
import org.apache.kafka.clients.admin.ConsumerGroupListing;
20+
import org.apache.kafka.clients.admin.GroupListing;
2121
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
22+
import org.apache.kafka.clients.admin.ListGroupsOptions;
2223
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2324
import org.apache.kafka.common.TopicPartition;
2425
import org.apache.kafka.common.config.Config;
@@ -225,7 +226,7 @@ private void loadInitialConsumerGroups()
225226
Set<String> findConsumerGroups()
226227
throws InterruptedException, ExecutionException {
227228
List<String> filteredGroups = listConsumerGroups().stream()
228-
.map(ConsumerGroupListing::groupId)
229+
.map(GroupListing::groupId)
229230
.filter(this::shouldReplicateByGroupFilter)
230231
.collect(Collectors.toList());
231232

@@ -252,10 +253,10 @@ Set<String> findConsumerGroups()
252253
return checkpointGroups;
253254
}
254255

255-
Collection<ConsumerGroupListing> listConsumerGroups()
256+
Collection<GroupListing> listConsumerGroups()
256257
throws InterruptedException, ExecutionException {
257258
return adminCall(
258-
() -> sourceAdminClient.listConsumerGroups().valid().get(),
259+
() -> sourceAdminClient.listGroups(ListGroupsOptions.forConsumerGroups()).valid().get(),
259260
() -> "list consumer groups on " + config.sourceClusterAlias() + " cluster"
260261
);
261262
}

connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java

+13-10
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
*/
1717
package org.apache.kafka.connect.mirror;
1818

19-
import org.apache.kafka.clients.admin.ConsumerGroupListing;
19+
import org.apache.kafka.clients.admin.GroupListing;
2020
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
21+
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
22+
import org.apache.kafka.common.GroupType;
2123
import org.apache.kafka.common.TopicPartition;
2224
import org.apache.kafka.connect.errors.ConnectException;
2325
import org.apache.kafka.connect.errors.RetriableException;
@@ -31,6 +33,7 @@
3133
import java.util.HashSet;
3234
import java.util.List;
3335
import java.util.Map;
36+
import java.util.Optional;
3437
import java.util.Set;
3538
import java.util.function.Function;
3639
import java.util.stream.Collectors;
@@ -144,9 +147,9 @@ public void testFindConsumerGroups() throws Exception {
144147
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptySet(), config);
145148
connector = spy(connector);
146149

147-
Collection<ConsumerGroupListing> groups = Arrays.asList(
148-
new ConsumerGroupListing("g1", true),
149-
new ConsumerGroupListing("g2", false));
150+
Collection<GroupListing> groups = Arrays.asList(
151+
new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()),
152+
new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()));
150153
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
151154
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
152155
doReturn(groups).when(connector).listConsumerGroups();
@@ -159,7 +162,7 @@ public void testFindConsumerGroups() throws Exception {
159162
doReturn(groupToOffsets).when(connector).listConsumerGroupOffsets(anyList());
160163
Set<String> groupFound = connector.findConsumerGroups();
161164

162-
Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
165+
Set<String> expectedGroups = groups.stream().map(GroupListing::groupId).collect(Collectors.toSet());
163166
assertEquals(expectedGroups, groupFound,
164167
"Expected groups are not the same as findConsumerGroups");
165168

@@ -174,11 +177,11 @@ public void testFindConsumerGroupsInCommonScenarios() throws Exception {
174177
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptySet(), config);
175178
connector = spy(connector);
176179

177-
Collection<ConsumerGroupListing> groups = Arrays.asList(
178-
new ConsumerGroupListing("g1", true),
179-
new ConsumerGroupListing("g2", false),
180-
new ConsumerGroupListing("g3", false),
181-
new ConsumerGroupListing("g4", false));
180+
Collection<GroupListing> groups = Arrays.asList(
181+
new GroupListing("g1", Optional.of(GroupType.CLASSIC), "", Optional.empty()),
182+
new GroupListing("g2", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()),
183+
new GroupListing("g3", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()),
184+
new GroupListing("g4", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()));
182185
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup1 = new HashMap<>();
183186
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup2 = new HashMap<>();
184187
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup3 = new HashMap<>();

connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
import org.apache.kafka.clients.CommonClientConfigs;
2020
import org.apache.kafka.clients.admin.Admin;
21-
import org.apache.kafka.clients.admin.ConsumerGroupListing;
21+
import org.apache.kafka.clients.admin.GroupListing;
22+
import org.apache.kafka.clients.admin.ListGroupsOptions;
2223
import org.apache.kafka.common.test.api.Flaky;
2324
import org.apache.kafka.connect.runtime.ConnectorConfig;
2425
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
@@ -184,7 +185,7 @@ public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Except
184185

185186
// Ensure that the overridden consumer group ID was the one actually used
186187
try (Admin admin = connect.kafka().createAdminClient()) {
187-
Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get();
188+
Collection<GroupListing> consumerGroups = admin.listGroups(ListGroupsOptions.forConsumerGroups()).all().get();
188189
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId())));
189190
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
190191
}
@@ -343,7 +344,7 @@ public void testAlterSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exce
343344
alterAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
344345
// Ensure that the overridden consumer group ID was the one actually used
345346
try (Admin admin = connect.kafka().createAdminClient()) {
346-
Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get();
347+
Collection<GroupListing> consumerGroups = admin.listGroups(ListGroupsOptions.forConsumerGroups()).all().get();
347348
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId())));
348349
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
349350
}
@@ -724,7 +725,7 @@ public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exce
724725
resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
725726
// Ensure that the overridden consumer group ID was the one actually used
726727
try (Admin admin = connect.kafka().createAdminClient()) {
727-
Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get();
728+
Collection<GroupListing> consumerGroups = admin.listGroups(ListGroupsOptions.forConsumerGroups()).all().get();
728729
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId())));
729730
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
730731
}

core/src/main/scala/kafka/admin/ConfigCommand.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import joptsimple._
2424
import kafka.server.DynamicConfig
2525
import kafka.utils.Implicits._
2626
import kafka.utils.Logging
27-
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
27+
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListGroupsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
2828
import org.apache.kafka.common.config.ConfigResource
2929
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
3030
import org.apache.kafka.common.internals.Topic
@@ -350,7 +350,7 @@ object ConfigCommand extends Logging {
350350
case ClientMetricsType =>
351351
adminClient.listClientMetricsResources().all().get().asScala.map(_.name).toSeq
352352
case GroupType =>
353-
adminClient.listConsumerGroups().all.get.asScala.map(_.groupId).toSeq
353+
adminClient.listGroups(ListGroupsOptions.forConsumerGroups()).all.get.asScala.map(_.groupId).toSeq
354354
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
355355
})
356356

0 commit comments

Comments
 (0)