Skip to content

Commit b5f8aaf

Browse files
authored
KAFKA-18288: Add support kafka-streams-groups.sh --describe (#19433)
Implement `--describe` and its options: (`--state`, -`-offset`, `--members` and the combination of them with `--verbose`) as described in `KIP-1071`. Reviewers: Lucas Brutschy <[email protected]>, PoAn Yang <[email protected]>
1 parent 42771b6 commit b5f8aaf

File tree

4 files changed

+624
-13
lines changed

4 files changed

+624
-13
lines changed

Diff for: tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java

+238-3
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,30 @@
1818

1919
import org.apache.kafka.clients.CommonClientConfigs;
2020
import org.apache.kafka.clients.admin.Admin;
21+
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
2122
import org.apache.kafka.clients.admin.GroupListing;
23+
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
2224
import org.apache.kafka.clients.admin.ListGroupsOptions;
2325
import org.apache.kafka.clients.admin.ListGroupsResult;
26+
import org.apache.kafka.clients.admin.ListOffsetsResult;
27+
import org.apache.kafka.clients.admin.OffsetSpec;
28+
import org.apache.kafka.clients.admin.StreamsGroupDescription;
29+
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
30+
import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
31+
import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
32+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2433
import org.apache.kafka.common.GroupState;
2534
import org.apache.kafka.common.GroupType;
35+
import org.apache.kafka.common.TopicPartition;
2636
import org.apache.kafka.common.utils.Utils;
2737
import org.apache.kafka.server.util.CommandLineUtils;
2838

2939
import java.io.IOException;
3040
import java.util.ArrayList;
3141
import java.util.Arrays;
3242
import java.util.Collection;
43+
import java.util.HashMap;
44+
import java.util.HashSet;
3345
import java.util.List;
3446
import java.util.Map;
3547
import java.util.Optional;
@@ -49,9 +61,9 @@ public static void main(String[] args) {
4961
opts.checkArgs();
5062

5163
// should have exactly one action
52-
long numberOfActions = Stream.of(opts.listOpt).filter(opts.options::has).count();
64+
long numberOfActions = Stream.of(opts.listOpt, opts.describeOpt).filter(opts.options::has).count();
5365
if (numberOfActions != 1)
54-
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list.");
66+
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, or --describe.");
5567

5668
run(opts);
5769
} catch (OptionException e) {
@@ -63,6 +75,8 @@ public static void run(StreamsGroupCommandOptions opts) {
6375
try (StreamsGroupService streamsGroupService = new StreamsGroupService(opts, Map.of())) {
6476
if (opts.options.has(opts.listOpt)) {
6577
streamsGroupService.listGroups();
78+
} else if (opts.options.has(opts.describeOpt)) {
79+
streamsGroupService.describeGroups();
6680
} else {
6781
throw new IllegalArgumentException("Unknown action!");
6882
}
@@ -79,7 +93,7 @@ static Set<GroupState> groupStatesFromString(String input) {
7993
Set<GroupState> validStates = GroupState.groupStatesForType(GroupType.STREAMS);
8094
if (!validStates.containsAll(parsedStates)) {
8195
throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " +
82-
validStates.stream().map(GroupState::toString).collect(Collectors.joining(", ")));
96+
validStates.stream().map(GroupState::toString).collect(Collectors.joining(", ")));
8397
}
8498
return parsedStates;
8599
}
@@ -154,6 +168,224 @@ private void printGroupInfo(List<GroupListing> groups) {
154168
}
155169
}
156170

171+
public void describeGroups() throws ExecutionException, InterruptedException {
172+
List<String> groups = listStreamsGroups();
173+
if (!groups.isEmpty()) {
174+
StreamsGroupDescription description = getDescribeGroup(groups.get(0));
175+
if (description == null)
176+
return;
177+
boolean verbose = opts.options.has(opts.verboseOpt);
178+
if (opts.options.has(opts.membersOpt)) {
179+
printMembers(description, verbose);
180+
} else if (opts.options.has(opts.stateOpt)) {
181+
printStates(description, verbose);
182+
} else {
183+
printOffsets(description, verbose);
184+
}
185+
}
186+
}
187+
188+
StreamsGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException {
189+
DescribeStreamsGroupsResult result = adminClient.describeStreamsGroups(List.of(group));
190+
Map<String, StreamsGroupDescription> descriptionMap = result.all().get();
191+
return descriptionMap.get(group);
192+
}
193+
194+
private void printMembers(StreamsGroupDescription description, boolean verbose) {
195+
final int groupLen = Math.max(15, description.groupId().length());
196+
int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
197+
Collection<StreamsGroupMemberDescription> members = description.members();
198+
if (isGroupStateValid(description.groupState(), description.members().size())) {
199+
maybePrintEmptyGroupState(description.groupId(), description.groupState());
200+
for (StreamsGroupMemberDescription member : members) {
201+
maxMemberIdLen = Math.max(maxMemberIdLen, member.memberId().length());
202+
maxHostLen = Math.max(maxHostLen, member.processId().length());
203+
maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length());
204+
}
205+
206+
if (!verbose) {
207+
String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
208+
System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
209+
for (StreamsGroupMemberDescription member : members) {
210+
System.out.printf(fmt, description.groupId(), member.memberId(), member.processId(), member.clientId(),
211+
getTasksForPrinting(member.assignment(), Optional.empty()));
212+
}
213+
} else {
214+
final int targetAssignmentEpochLen = 25, topologyEpochLen = 15, memberProtocolLen = 15, memberEpochLen = 15;
215+
String fmt = "%" + -groupLen + "s %" + -targetAssignmentEpochLen + "s %" + -topologyEpochLen + "s%" + -maxMemberIdLen
216+
+ "s %" + -memberProtocolLen + "s %" + -memberEpochLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
217+
System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
218+
for (StreamsGroupMemberDescription member : members) {
219+
System.out.printf(fmt, description.groupId(), description.targetAssignmentEpoch(), description.topologyEpoch(), member.memberId(),
220+
member.isClassic() ? "classic" : "streams", member.memberEpoch(), member.processId(), member.clientId(), getTasksForPrinting(member.assignment(), Optional.of(member.targetAssignment())));
221+
}
222+
}
223+
}
224+
}
225+
226+
private String prepareTaskType(List<StreamsGroupMemberAssignment.TaskIds> tasks, String taskType) {
227+
if (tasks.isEmpty()) {
228+
return "";
229+
}
230+
StringBuilder builder = new StringBuilder(taskType).append(": ");
231+
for (StreamsGroupMemberAssignment.TaskIds taskIds : tasks) {
232+
builder.append(taskIds.subtopologyId()).append(":[");
233+
builder.append(taskIds.partitions().stream().map(String::valueOf).collect(Collectors.joining(",")));
234+
builder.append("]; ");
235+
}
236+
return builder.toString();
237+
}
238+
239+
private String getTasksForPrinting(StreamsGroupMemberAssignment assignment, Optional<StreamsGroupMemberAssignment> targetAssignment) {
240+
StringBuilder builder = new StringBuilder();
241+
builder.append(prepareTaskType(assignment.activeTasks(), "ACTIVE"))
242+
.append(prepareTaskType(assignment.standbyTasks(), "STANDBY"))
243+
.append(prepareTaskType(assignment.warmupTasks(), "WARMUP"));
244+
targetAssignment.ifPresent(target -> builder.append(prepareTaskType(target.activeTasks(), "TARGET-ACTIVE"))
245+
.append(prepareTaskType(target.standbyTasks(), "TARGET-STANDBY"))
246+
.append(prepareTaskType(target.warmupTasks(), "TARGET-WARMUP")));
247+
return builder.toString();
248+
}
249+
250+
private void printStates(StreamsGroupDescription description, boolean verbose) {
251+
maybePrintEmptyGroupState(description.groupId(), description.groupState());
252+
253+
final int groupLen = Math.max(15, description.groupId().length());
254+
String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")";
255+
256+
final int coordinatorLen = Math.max(25, coordinator.length());
257+
final int stateLen = 25;
258+
if (!verbose) {
259+
String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %" + -stateLen + "s %s\n";
260+
System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS");
261+
System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.members().size());
262+
} else {
263+
final int groupEpochLen = 15, targetAssignmentEpochLen = 25;
264+
String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %" + -stateLen + "s %" + -groupEpochLen + "s %" + -targetAssignmentEpochLen + "s %s\n";
265+
System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
266+
System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.groupEpoch(), description.targetAssignmentEpoch(), description.members().size());
267+
}
268+
}
269+
270+
private void printOffsets(StreamsGroupDescription description, boolean verbose) throws ExecutionException, InterruptedException {
271+
Map<TopicPartition, OffsetsInfo> offsets = getOffsets(description);
272+
if (isGroupStateValid(description.groupState(), description.members().size())) {
273+
maybePrintEmptyGroupState(description.groupId(), description.groupState());
274+
final int groupLen = Math.max(15, description.groupId().length());
275+
int maxTopicLen = 15;
276+
for (TopicPartition topicPartition : offsets.keySet()) {
277+
maxTopicLen = Math.max(maxTopicLen, topicPartition.topic().length());
278+
}
279+
final int maxPartitionLen = 10;
280+
if (!verbose) {
281+
String fmt = "%" + -groupLen + "s %" + -maxTopicLen + "s %" + -maxPartitionLen + "s %s\n";
282+
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "OFFSET-LAG");
283+
for (Map.Entry<TopicPartition, OffsetsInfo> offset : offsets.entrySet()) {
284+
System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue().lag);
285+
}
286+
} else {
287+
String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %-15s %-15s %-15s%n";
288+
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
289+
for (Map.Entry<TopicPartition, OffsetsInfo> offset : offsets.entrySet()) {
290+
System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(),
291+
offset.getValue().currentOffset.map(Object::toString).orElse("-"), offset.getValue().leaderEpoch.map(Object::toString).orElse("-"),
292+
offset.getValue().logEndOffset, offset.getValue().lag);
293+
}
294+
}
295+
}
296+
}
297+
298+
Map<TopicPartition, OffsetsInfo> getOffsets(StreamsGroupDescription description) throws ExecutionException, InterruptedException {
299+
final Collection<StreamsGroupMemberDescription> members = description.members();
300+
Set<TopicPartition> allTp = new HashSet<>();
301+
for (StreamsGroupMemberDescription memberDescription : members) {
302+
allTp.addAll(getTopicPartitions(memberDescription.assignment().activeTasks(), description));
303+
}
304+
// fetch latest and earliest offsets
305+
Map<TopicPartition, OffsetSpec> earliest = new HashMap<>();
306+
Map<TopicPartition, OffsetSpec> latest = new HashMap<>();
307+
308+
for (TopicPartition tp : allTp) {
309+
earliest.put(tp, OffsetSpec.earliest());
310+
latest.put(tp, OffsetSpec.latest());
311+
}
312+
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestResult = adminClient.listOffsets(earliest).all().get();
313+
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestResult = adminClient.listOffsets(latest).all().get();
314+
Map<TopicPartition, OffsetAndMetadata> committedOffsets = getCommittedOffsets(description.groupId());
315+
316+
Map<TopicPartition, OffsetsInfo> output = new HashMap<>();
317+
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> tp : earliestResult.entrySet()) {
318+
final Optional<Long> currentOffset = committedOffsets.containsKey(tp.getKey()) ? Optional.of(committedOffsets.get(tp.getKey()).offset()) : Optional.empty();
319+
final Optional<Integer> leaderEpoch = committedOffsets.containsKey(tp.getKey()) ? committedOffsets.get(tp.getKey()).leaderEpoch() : Optional.empty();
320+
final long lag = currentOffset.map(current -> latestResult.get(tp.getKey()).offset() - current).orElseGet(() -> latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset());
321+
output.put(tp.getKey(),
322+
new OffsetsInfo(
323+
currentOffset,
324+
leaderEpoch,
325+
latestResult.get(tp.getKey()).offset(),
326+
lag));
327+
}
328+
return output;
329+
}
330+
331+
Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String groupId) {
332+
try {
333+
return adminClient.listConsumerGroupOffsets(
334+
Map.of(groupId, new ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
335+
} catch (InterruptedException | ExecutionException e) {
336+
throw new RuntimeException(e);
337+
}
338+
}
339+
340+
/**
341+
* Prints an error message if the group state indicates that the group is either dead or empty.
342+
*
343+
* @param group The ID of the group being checked.
344+
* @param state The current state of the group, represented as a `GroupState` object.
345+
* Possible values include `DEAD` (indicating the group does not exist)
346+
* and `EMPTY` (indicating the group has no active members).
347+
*/
348+
private static void maybePrintEmptyGroupState(String group, GroupState state) {
349+
if (state == GroupState.DEAD) {
350+
printError("Streams group '" + group + "' does not exist.", Optional.empty());
351+
} else if (state == GroupState.EMPTY) {
352+
printError("Streams group '" + group + "' has no active members.", Optional.empty());
353+
}
354+
}
355+
356+
/**
357+
* Checks if the group state is valid based on its state and the number of rows.
358+
*
359+
* @param state The current state of the group, represented as a `GroupState` object.
360+
* @param numRows The number of rows associated with the group.
361+
* @return `true` if the group state is not `DEAD` and the number of rows is greater than 0; otherwise, `false`.
362+
*/
363+
// Visibility for testing
364+
static boolean isGroupStateValid(GroupState state, int numRows) {
365+
return !state.equals(GroupState.DEAD) && numRows > 0;
366+
}
367+
368+
private static Set<TopicPartition> getTopicPartitions(List<StreamsGroupMemberAssignment.TaskIds> taskIds, StreamsGroupDescription description) {
369+
Map<String, List<String>> allSourceTopics = new HashMap<>();
370+
for (StreamsGroupSubtopologyDescription subtopologyDescription : description.subtopologies()) {
371+
allSourceTopics.put(subtopologyDescription.subtopologyId(), subtopologyDescription.sourceTopics());
372+
}
373+
Set<TopicPartition> topicPartitions = new HashSet<>();
374+
375+
for (StreamsGroupMemberAssignment.TaskIds task : taskIds) {
376+
List<String> sourceTopics = allSourceTopics.get(task.subtopologyId());
377+
if (sourceTopics == null) {
378+
throw new IllegalArgumentException("Subtopology " + task.subtopologyId() + " not found in group description!");
379+
}
380+
for (String topic : sourceTopics) {
381+
for (Integer partition : task.partitions()) {
382+
topicPartitions.add(new TopicPartition(topic, partition));
383+
}
384+
}
385+
}
386+
return topicPartitions;
387+
}
388+
157389
public void close() {
158390
adminClient.close();
159391
}
@@ -165,4 +397,7 @@ protected Admin createAdminClient(Map<String, String> configOverrides) throws IO
165397
return Admin.create(props);
166398
}
167399
}
400+
401+
public record OffsetsInfo(Optional<Long> currentOffset, Optional<Integer> leaderEpoch, Long logEndOffset, Long lag) {
402+
}
168403
}

0 commit comments

Comments
 (0)