Skip to content

Commit 3623726

Browse files
authored
KAFKA-18286: Implement support for streams groups in kafka-groups.sh (#19423)
Add support for streams groups in kafka-groups.sh. The change adds command-line options `--streams` to list only streams groups, and value `--group-type streams`. Those two options are mutually exclusive with other group type and protocol filters specified on the command line. Includes a small integration test that spins up a kafka streams application and lists the group. Reviewers: Bill Bejeck <[email protected]>, Alieh Saeedii <[email protected]>
1 parent 99f871a commit 3623726

File tree

2 files changed

+160
-20
lines changed

2 files changed

+160
-20
lines changed

tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java

+19-7
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,19 @@ public GroupsService(Properties config) {
104104
public void listGroups(GroupsCommandOptions opts) throws Exception {
105105
Collection<GroupListing> resources = adminClient.listGroups()
106106
.all().get(30, TimeUnit.SECONDS);
107-
printGroupDetails(resources, opts.groupType(), opts.protocol(), opts.hasConsumerOption(), opts.hasShareOption());
107+
printGroupDetails(resources, opts.groupType(), opts.protocol(), opts.hasConsumerOption(), opts.hasShareOption(), opts.hasStreamsOption());
108108
}
109109

110110
private void printGroupDetails(Collection<GroupListing> groups,
111111
Optional<GroupType> groupTypeFilter,
112112
Optional<String> protocolFilter,
113113
boolean consumerGroupFilter,
114-
boolean shareGroupFilter) {
114+
boolean shareGroupFilter,
115+
boolean streamsGroupFilter) {
115116
List<List<String>> lineItems = new ArrayList<>();
116117
int maxLen = 20;
117118
for (GroupListing group : groups) {
118-
if (combinedFilter(group, groupTypeFilter, protocolFilter, consumerGroupFilter, shareGroupFilter)) {
119+
if (combinedFilter(group, groupTypeFilter, protocolFilter, consumerGroupFilter, shareGroupFilter, streamsGroupFilter)) {
119120
List<String> lineItem = new ArrayList<>();
120121
lineItem.add(group.groupId());
121122
lineItem.add(group.type().map(GroupType::toString).orElse(""));
@@ -145,7 +146,8 @@ private boolean combinedFilter(GroupListing group,
145146
Optional<GroupType> groupTypeFilter,
146147
Optional<String> protocolFilter,
147148
boolean consumerGroupFilter,
148-
boolean shareGroupFilter) {
149+
boolean shareGroupFilter,
150+
boolean streamsGroupFilter) {
149151
boolean pass = true;
150152
Optional<GroupType> groupType = group.type();
151153
String protocol = group.protocol();
@@ -159,6 +161,8 @@ private boolean combinedFilter(GroupListing group,
159161
pass = protocol.equals("consumer") || protocol.isEmpty() || groupType.filter(gt -> gt == GroupType.CONSUMER).isPresent();
160162
} else if (shareGroupFilter) {
161163
pass = groupType.filter(gt -> gt == GroupType.SHARE).isPresent();
164+
} else if (streamsGroupFilter) {
165+
pass = groupType.filter(gt -> gt == GroupType.STREAMS).isPresent();
162166
}
163167
return pass;
164168
}
@@ -189,6 +193,8 @@ public static final class GroupsCommandOptions extends CommandDefaultOptions {
189193

190194
private final OptionSpecBuilder shareOpt;
191195

196+
private final OptionSpecBuilder streamsOpt;
197+
192198
public GroupsCommandOptions(String[] args) {
193199
super(args);
194200
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The Kafka server to connect to.")
@@ -204,7 +210,7 @@ public GroupsCommandOptions(String[] args) {
204210
listOpt = parser.accepts("list", "List the groups.");
205211

206212
groupTypeOpt = parser.accepts("group-type", "Filter the groups based on group type. "
207-
+ "Valid types are: 'classic', 'consumer' and 'share'.")
213+
+ "Valid types are: 'classic', 'consumer', 'share' and 'streams'.")
208214
.withRequiredArg()
209215
.describedAs("type")
210216
.ofType(String.class);
@@ -217,6 +223,7 @@ public GroupsCommandOptions(String[] args) {
217223
consumerOpt = parser.accepts("consumer", "Filter the groups to show all kinds of consumer groups, including classic and simple consumer groups. "
218224
+ "This matches group type 'consumer', and group type 'classic' where the protocol type is 'consumer' or empty.");
219225
shareOpt = parser.accepts("share", "Filter the groups to show share groups.");
226+
streamsOpt = parser.accepts("streams", "Filter the groups to show streams groups.");
220227

221228
try {
222229
options = parser.parse(args);
@@ -275,6 +282,10 @@ public boolean hasShareOption() {
275282
return has(shareOpt);
276283
}
277284

285+
public boolean hasStreamsOption() {
286+
return has(streamsOpt);
287+
}
288+
278289
public void checkArgs() {
279290
if (args.length == 0)
280291
CommandLineUtils.printUsageAndExit(parser, "This tool helps to list groups of all types.");
@@ -293,8 +304,9 @@ public void checkArgs() {
293304
}
294305

295306
// check invalid args
296-
CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, groupTypeOpt, protocolOpt, shareOpt);
297-
CommandLineUtils.checkInvalidArgs(parser, options, shareOpt, consumerOpt, groupTypeOpt, protocolOpt);
307+
CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, groupTypeOpt, protocolOpt, shareOpt, streamsOpt);
308+
CommandLineUtils.checkInvalidArgs(parser, options, shareOpt, consumerOpt, groupTypeOpt, protocolOpt, streamsOpt);
309+
CommandLineUtils.checkInvalidArgs(parser, options, streamsOpt, consumerOpt, groupTypeOpt, protocolOpt, shareOpt);
298310
}
299311
}
300312
}

0 commit comments

Comments
 (0)