Skip to content

Commit fa62bce

Browse files
authored
KAFKA-18287: Add support for kafka-streams-groups.sh --list (#19422)
Implement the core of kafka-streams-groups.sh for `KIP-1071` - Implement `--list` and its options: (only `--state`) Reviewers: Bruno Cadonna <[email protected]>
1 parent 2a370ed commit fa62bce

File tree

4 files changed

+440
-0
lines changed

4 files changed

+440
-0
lines changed

bin/kafka-streams-groups.sh

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/bash
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.streams.StreamsGroupCommand "$@"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.tools.streams;
18+
19+
import org.apache.kafka.clients.CommonClientConfigs;
20+
import org.apache.kafka.clients.admin.Admin;
21+
import org.apache.kafka.clients.admin.GroupListing;
22+
import org.apache.kafka.clients.admin.ListGroupsOptions;
23+
import org.apache.kafka.clients.admin.ListGroupsResult;
24+
import org.apache.kafka.common.GroupState;
25+
import org.apache.kafka.common.GroupType;
26+
import org.apache.kafka.common.utils.Utils;
27+
import org.apache.kafka.server.util.CommandLineUtils;
28+
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.Collection;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Optional;
36+
import java.util.Properties;
37+
import java.util.Set;
38+
import java.util.concurrent.ExecutionException;
39+
import java.util.stream.Collectors;
40+
import java.util.stream.Stream;
41+
42+
import joptsimple.OptionException;
43+
44+
public class StreamsGroupCommand {
45+
46+
public static void main(String[] args) {
47+
StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
48+
try {
49+
opts.checkArgs();
50+
51+
// should have exactly one action
52+
long numberOfActions = Stream.of(opts.listOpt).filter(opts.options::has).count();
53+
if (numberOfActions != 1)
54+
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list.");
55+
56+
run(opts);
57+
} catch (OptionException e) {
58+
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
59+
}
60+
}
61+
62+
public static void run(StreamsGroupCommandOptions opts) {
63+
try (StreamsGroupService streamsGroupService = new StreamsGroupService(opts, Map.of())) {
64+
if (opts.options.has(opts.listOpt)) {
65+
streamsGroupService.listGroups();
66+
} else {
67+
throw new IllegalArgumentException("Unknown action!");
68+
}
69+
} catch (IllegalArgumentException e) {
70+
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
71+
} catch (Throwable e) {
72+
printError("Executing streams group command failed due to " + e.getMessage(), Optional.of(e));
73+
}
74+
}
75+
76+
static Set<GroupState> groupStatesFromString(String input) {
77+
Set<GroupState> parsedStates =
78+
Arrays.stream(input.split(",")).map(s -> GroupState.parse(s.trim())).collect(Collectors.toSet());
79+
Set<GroupState> validStates = GroupState.groupStatesForType(GroupType.STREAMS);
80+
if (!validStates.containsAll(parsedStates)) {
81+
throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " +
82+
validStates.stream().map(GroupState::toString).collect(Collectors.joining(", ")));
83+
}
84+
return parsedStates;
85+
}
86+
87+
public static void printError(String msg, Optional<Throwable> e) {
88+
System.out.println("\nError: " + msg);
89+
e.ifPresent(Throwable::printStackTrace);
90+
}
91+
92+
// Visibility for testing
93+
static class StreamsGroupService implements AutoCloseable {
94+
final StreamsGroupCommandOptions opts;
95+
private final Admin adminClient;
96+
97+
public StreamsGroupService(StreamsGroupCommandOptions opts, Map<String, String> configOverrides) {
98+
this.opts = opts;
99+
try {
100+
this.adminClient = createAdminClient(configOverrides);
101+
} catch (IOException e) {
102+
throw new RuntimeException(e);
103+
}
104+
}
105+
106+
public StreamsGroupService(StreamsGroupCommandOptions opts, Admin adminClient) {
107+
this.opts = opts;
108+
this.adminClient = adminClient;
109+
}
110+
111+
public void listGroups() throws ExecutionException, InterruptedException {
112+
if (opts.options.has(opts.stateOpt)) {
113+
String stateValue = opts.options.valueOf(opts.stateOpt);
114+
Set<GroupState> states = (stateValue == null || stateValue.isEmpty())
115+
? Set.of()
116+
: groupStatesFromString(stateValue);
117+
List<GroupListing> listings = listStreamsGroupsInStates(states);
118+
printGroupInfo(listings);
119+
} else
120+
listStreamsGroups().forEach(System.out::println);
121+
}
122+
123+
List<String> listStreamsGroups() {
124+
try {
125+
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
126+
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
127+
.withTypes(Set.of(GroupType.STREAMS)));
128+
Collection<GroupListing> listings = result.all().get();
129+
return listings.stream().map(GroupListing::groupId).collect(Collectors.toList());
130+
} catch (InterruptedException | ExecutionException e) {
131+
throw new RuntimeException(e);
132+
}
133+
}
134+
135+
List<GroupListing> listStreamsGroupsInStates(Set<GroupState> states) throws ExecutionException, InterruptedException {
136+
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
137+
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
138+
.withTypes(Set.of(GroupType.STREAMS))
139+
.inGroupStates(states));
140+
return new ArrayList<>(result.all().get());
141+
}
142+
143+
private void printGroupInfo(List<GroupListing> groups) {
144+
// find proper columns width
145+
int maxGroupLen = 15;
146+
for (GroupListing group : groups) {
147+
maxGroupLen = Math.max(maxGroupLen, group.groupId().length());
148+
}
149+
System.out.printf("%" + (-maxGroupLen) + "s %s\n", "GROUP", "STATE");
150+
for (GroupListing group : groups) {
151+
String groupId = group.groupId();
152+
String state = group.groupState().orElse(GroupState.UNKNOWN).toString();
153+
System.out.printf("%" + (-maxGroupLen) + "s %s\n", groupId, state);
154+
}
155+
}
156+
157+
public void close() {
158+
adminClient.close();
159+
}
160+
161+
protected Admin createAdminClient(Map<String, String> configOverrides) throws IOException {
162+
Properties props = opts.options.has(opts.commandConfigOpt) ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties();
163+
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt));
164+
props.putAll(configOverrides);
165+
return Admin.create(props);
166+
}
167+
}
168+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.tools.streams;
18+
19+
import org.apache.kafka.server.util.CommandDefaultOptions;
20+
import org.apache.kafka.server.util.CommandLineUtils;
21+
22+
import joptsimple.OptionSpec;
23+
24+
public class StreamsGroupCommandOptions extends CommandDefaultOptions {
25+
public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to.";
26+
public static final String LIST_DOC = "List all streams groups.";
27+
public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
28+
"to specify the maximum amount of time in milliseconds to wait before the group stabilizes.";
29+
public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client.";
30+
public static final String STATE_DOC = "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. " +
31+
"Valid values are Empty, NotReady, Stable, Assigning, Reconciling, and Dead.";
32+
33+
public final OptionSpec<String> bootstrapServerOpt;
34+
public final OptionSpec<Void> listOpt;
35+
public final OptionSpec<Long> timeoutMsOpt;
36+
public final OptionSpec<String> commandConfigOpt;
37+
public final OptionSpec<String> stateOpt;
38+
39+
40+
public StreamsGroupCommandOptions(String[] args) {
41+
super(args);
42+
43+
bootstrapServerOpt = parser.accepts("bootstrap-server", BOOTSTRAP_SERVER_DOC)
44+
.withRequiredArg()
45+
.describedAs("server to connect to")
46+
.ofType(String.class);
47+
listOpt = parser.accepts("list", LIST_DOC);
48+
timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
49+
.withRequiredArg()
50+
.describedAs("timeout (ms)")
51+
.ofType(Long.class)
52+
.defaultsTo(5000L);
53+
commandConfigOpt = parser.accepts("command-config", COMMAND_CONFIG_DOC)
54+
.withRequiredArg()
55+
.describedAs("command config property file")
56+
.ofType(String.class);
57+
stateOpt = parser.accepts("state", STATE_DOC)
58+
.availableIf(listOpt)
59+
.withOptionalArg()
60+
.ofType(String.class);
61+
62+
options = parser.parse(args);
63+
}
64+
65+
public void checkArgs() {
66+
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list streams groups.");
67+
68+
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
69+
}
70+
}

0 commit comments

Comments
 (0)