Skip to content

Commit 6d72677

Browse files
authored
KAFKA-18613: Add StreamsGroupHeartbeat handler in the group coordinator (#19114)
Basic streams group heartbeat handling. The main part of are the unit tests that make sure that we behave, for the most part, like a consumer group. - No support for static membership - No support for configurations (using constants instead) - No support for regular expressions Reviewers: Bill Bejeck <[email protected]>, Bruno Cadonna <[email protected]>
1 parent 4144290 commit 6d72677

14 files changed

+3252
-84
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
400400
* @param context The request context.
401401
* @param request The actual StreamsGroupHeartbeat request.
402402
*
403-
* @return A Result containing the StreamsGroupHeartbeat response, a list of internal topics to be created and
403+
* @return A result containing the StreamsGroupHeartbeat response, a list of internal topics to be created and
404404
* a list of records to update the state machine.
405405
*/
406406
public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

+804-42
Large diffs are not rendered by default.

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds(BiFunction<Strin
146146
* @param ownedAssignment A collection of active, standby and warm-up tasks
147147
* @return This object.
148148
*/
149-
protected CurrentAssignmentBuilder withOwnedAssignment(TasksTuple ownedAssignment) {
149+
public CurrentAssignmentBuilder withOwnedAssignment(TasksTuple ownedAssignment) {
150150
this.ownedTasks = Optional.ofNullable(ownedAssignment);
151151
return this;
152152
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java

+44-12
Original file line numberDiff line numberDiff line change
@@ -330,29 +330,61 @@ public String staticMemberId(String groupInstanceId) {
330330
}
331331

332332
/**
333-
* Gets or creates a new member but without adding it to the group. Adding a member is done via the
333+
* Gets a new member or throws an exception, if the member does not exist.
334+
*
335+
* @param memberId The member ID.
336+
* @throws UnknownMemberIdException If the member is not found.
337+
* @return A StreamsGroupMember.
338+
*/
339+
public StreamsGroupMember getMemberOrThrow(
340+
String memberId
341+
) throws UnknownMemberIdException {
342+
StreamsGroupMember member = members.get(memberId);
343+
if (member != null) {
344+
return member;
345+
}
346+
347+
throw new UnknownMemberIdException(
348+
String.format("Member %s is not a member of group %s.", memberId, groupId)
349+
);
350+
}
351+
352+
/**
353+
* Gets or creates a new member, but keeping its fields uninitialized. This is used on the replay-path.
354+
* The member is not added to the group, adding a member is done via the
334355
* {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
335356
*
336357
* @param memberId The member ID.
337-
* @param createIfNotExists Booleans indicating whether the member must be created if it does not exist.
338358
* @return A StreamsGroupMember.
339359
*/
340-
public StreamsGroupMember getOrMaybeCreateMember(
341-
String memberId,
342-
boolean createIfNotExists
360+
public StreamsGroupMember getOrCreateUninitializedMember(
361+
String memberId
343362
) throws UnknownMemberIdException {
344363
StreamsGroupMember member = members.get(memberId);
345364
if (member != null) {
346365
return member;
347366
}
348367

349-
if (!createIfNotExists) {
350-
throw new UnknownMemberIdException(
351-
String.format("Member %s is not a member of group %s.", memberId, groupId)
352-
);
368+
return new StreamsGroupMember.Builder(memberId).build();
369+
}
370+
371+
/**
372+
* Gets or creates a new member, setting default values on the fields. This is used on the replay-path.
373+
* The member is not added to the group, adding a member is done via the
374+
* {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
375+
*
376+
* @param memberId The member ID.
377+
* @return A StreamsGroupMember.
378+
*/
379+
public StreamsGroupMember getOrCreateDefaultMember(
380+
String memberId
381+
) throws UnknownMemberIdException {
382+
StreamsGroupMember member = members.get(memberId);
383+
if (member != null) {
384+
return member;
353385
}
354386

355-
return new StreamsGroupMember.Builder(memberId).build();
387+
return StreamsGroupMember.Builder.withDefaults(memberId).build();
356388
}
357389

358390
/**
@@ -363,7 +395,7 @@ public StreamsGroupMember getOrMaybeCreateMember(
363395
*/
364396
public StreamsGroupMember staticMember(String instanceId) {
365397
String existingMemberId = staticMemberId(instanceId);
366-
return existingMemberId == null ? null : getOrMaybeCreateMember(existingMemberId, false);
398+
return existingMemberId == null ? null : getMemberOrThrow(existingMemberId);
367399
}
368400

369401
/**
@@ -656,7 +688,7 @@ public void validateOffsetCommit(
656688
memberId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID) && groupInstanceId == null)
657689
return;
658690

659-
final StreamsGroupMember member = getOrMaybeCreateMember(memberId, false);
691+
final StreamsGroupMember member = getMemberOrThrow(memberId);
660692

661693
// If the commit is not transactional and the member uses the new streams protocol (KIP-1071),
662694
// the member should be using the OffsetCommit API version >= 9.

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java

+15
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,21 @@ private static Map<String, Set<Integer>> assignmentFromTaskIds(
278278
taskIds -> Set.copyOf(taskIds.partitions())));
279279
}
280280

281+
public static Builder withDefaults(String memberId) {
282+
return new Builder(memberId)
283+
.setRebalanceTimeoutMs(-1)
284+
.setTopologyEpoch(-1)
285+
.setInstanceId(null)
286+
.setRackId(null)
287+
.setProcessId("")
288+
.setClientTags(Collections.emptyMap())
289+
.setState(MemberState.STABLE)
290+
.setMemberEpoch(0)
291+
.setAssignedTasks(TasksTuple.EMPTY)
292+
.setTasksPendingRevocation(TasksTuple.EMPTY)
293+
.setUserEndpoint(null);
294+
}
295+
281296
public StreamsGroupMember build() {
282297
return new StreamsGroupMember(
283298
memberId,

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java

+14
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.coordinator.group.streams;
1818

19+
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
1920
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
2021
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
2122
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
@@ -81,4 +82,17 @@ public static StreamsTopology fromRecord(StreamsGroupTopologyValue record) {
8182
record.subtopologies().stream().collect(Collectors.toMap(Subtopology::subtopologyId, x -> x))
8283
);
8384
}
85+
86+
/**
87+
* Creates an instance of StreamsTopology from a StreamsGroupHeartbeatRequestData request.
88+
*
89+
* @param topology The topology supplied in the request.
90+
* @return The instance of StreamsTopology created from the request.
91+
*/
92+
public static StreamsTopology fromHeartbeatRequest(StreamsGroupHeartbeatRequestData.Topology topology) {
93+
StreamsGroupTopologyValue recordValue = StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(topology);
94+
final Map<String, StreamsGroupTopologyValue.Subtopology> subtopologyMap = recordValue.subtopologies().stream()
95+
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x));
96+
return new StreamsTopology(topology.epoch(), subtopologyMap);
97+
}
8498
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java

+67-4
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616
*/
1717
package org.apache.kafka.coordinator.group.streams;
1818

19+
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
1920
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
2021

2122
import java.util.Collections;
2223
import java.util.HashMap;
2324
import java.util.HashSet;
25+
import java.util.Iterator;
26+
import java.util.List;
2427
import java.util.Map;
28+
import java.util.Map.Entry;
2529
import java.util.Objects;
2630
import java.util.Set;
2731
import java.util.stream.Collectors;
@@ -30,11 +34,11 @@
3034
* An immutable tuple containing active, standby and warm-up tasks.
3135
*
3236
* @param activeTasks Active tasks.
33-
* The key of the map is the subtopology ID and the value is the set of partition IDs.
37+
* The key of the map is the subtopology ID, and the value is the set of partition IDs.
3438
* @param standbyTasks Standby tasks.
35-
* The key of the map is the subtopology ID and the value is the set of partition IDs.
39+
* The key of the map is the subtopology ID, and the value is the set of partition IDs.
3640
* @param warmupTasks Warm-up tasks.
37-
* The key of the map is the subtopology ID and the value is the set of partition IDs.
41+
* The key of the map is the subtopology ID, and the value is the set of partition IDs.
3842
*/
3943
public record TasksTuple(Map<String, Set<Integer>> activeTasks,
4044
Map<String, Set<Integer>> standbyTasks,
@@ -88,7 +92,7 @@ private static Map<String, Set<Integer>> merge(final Map<String, Set<Integer>> t
8892
/**
8993
* Checks if this task tuple contains any of the tasks in another task tuple.
9094
*
91-
* @param other The other task tuple.
95+
* @param other Another task tuple.
9296
* @return true if there is at least one active, standby or warm-up task that is present in both tuples.
9397
*/
9498
public boolean containsAny(TasksTuple other) {
@@ -130,4 +134,63 @@ public static TasksTuple fromTargetAssignmentRecord(StreamsGroupTargetAssignment
130134
)
131135
);
132136
}
137+
138+
public String toString() {
139+
return "(active=" + taskAssignmentToString(activeTasks) +
140+
", standby=" + taskAssignmentToString(standbyTasks) +
141+
", warmup=" + taskAssignmentToString(warmupTasks) +
142+
')';
143+
}
144+
145+
public static TasksTuple fromHeartbeatRequest(final List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks,
146+
final List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks,
147+
final List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks) {
148+
return new TasksTuple(
149+
ownedActiveTasks.stream()
150+
.collect(Collectors.toMap(
151+
StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId,
152+
taskId -> new HashSet<>(taskId.partitions())
153+
)
154+
),
155+
ownedStandbyTasks.stream()
156+
.collect(Collectors.toMap(
157+
StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId,
158+
taskId -> new HashSet<>(taskId.partitions())
159+
)
160+
),
161+
ownedWarmupTasks.stream()
162+
.collect(Collectors.toMap(
163+
StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId,
164+
taskId -> new HashSet<>(taskId.partitions())
165+
)
166+
)
167+
);
168+
}
169+
170+
/**
171+
* @return The provided assignment as a String.
172+
*
173+
* Example:
174+
* [subtopologyID1-0, subtopologyID1-1, subtopologyID2-0, subtopologyID2-1]
175+
*/
176+
private static String taskAssignmentToString(
177+
Map<String, Set<Integer>> assignment
178+
) {
179+
StringBuilder builder = new StringBuilder("[");
180+
Iterator<Entry<String, Set<Integer>>> subtopologyIterator = assignment.entrySet().iterator();
181+
while (subtopologyIterator.hasNext()) {
182+
Map.Entry<String, Set<Integer>> entry = subtopologyIterator.next();
183+
Iterator<Integer> partitionsIterator = entry.getValue().iterator();
184+
while (partitionsIterator.hasNext()) {
185+
builder.append(entry.getKey());
186+
builder.append("-");
187+
builder.append(partitionsIterator.next());
188+
if (partitionsIterator.hasNext() || subtopologyIterator.hasNext()) {
189+
builder.append(", ");
190+
}
191+
}
192+
}
193+
builder.append("]");
194+
return builder.toString();
195+
}
133196
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MemberAssignment.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
/**
2424
* The task assignment for a Streams group member.
2525
*
26-
* @param activeTasks The target tasks assigned to this member keyed by subtopologyId.
26+
* @param activeTasks The active tasks assigned to this member keyed by subtopologyId.
27+
* @param standbyTasks The standby tasks assigned to this member keyed by subtopologyId.
28+
* @param warmupTasks The warm-up tasks assigned to this member keyed by subtopologyId.
2729
*/
2830
public record MemberAssignment(Map<String, Set<Integer>> activeTasks,
2931
Map<String, Set<Integer>> standbyTasks,

0 commit comments

Comments
 (0)