Skip to content

Commit 391b258

Browse files
authored
KAFKA-19077: Propagate shutdownRequested field (#19359)
When a member indicates that the application should shut down, set a soft-state flag on the streams group and continuously set the status `SHUTDOWN_APPLICATION` to all members, until the group is empty, which resets the flag. Reviewers: Matthias J. Sax <[email protected]>, Chia-Ping Tsai <[email protected]>, Jeff Kim <[email protected]>
1 parent 2a05c0c commit 391b258

File tree

4 files changed

+163
-8
lines changed

4 files changed

+163
-8
lines changed

Diff for: group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Endpoint;
6262
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.KeyValue;
6363
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.TaskIds;
64-
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.TaskOffset;
6564
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Topology;
6665
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
6766
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData.Status;
@@ -2068,8 +2067,6 @@ private List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
20682067
* @param ownedWarmupTasks The list of owned warmup tasks from the request or null.
20692068
* @param userEndpoint User-defined endpoint for Interactive Queries, or null.
20702069
* @param clientTags Used for rack-aware assignment algorithm, or null.
2071-
* @param taskEndOffsets Cumulative changelog offsets for tasks, or null.
2072-
* @param taskOffsets Cumulative changelog end-offsets for tasks, or null.
20732070
* @param shutdownApplication Whether all Streams clients in the group should shut down.
20742071
* @return A result containing the StreamsGroupHeartbeat response and a list of records to update the state machine.
20752072
*/
@@ -2089,8 +2086,6 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
20892086
String processId,
20902087
Endpoint userEndpoint,
20912088
List<KeyValue> clientTags,
2092-
List<TaskOffset> taskOffsets,
2093-
List<TaskOffset> taskEndOffsets,
20942089
boolean shutdownApplication
20952090
) throws ApiException {
20962091
final long currentTimeMs = time.milliseconds();
@@ -2224,6 +2219,9 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
22242219
);
22252220

22262221
scheduleStreamsGroupSessionTimeout(groupId, memberId);
2222+
if (shutdownApplication) {
2223+
group.setShutdownRequestMemberId(memberId);
2224+
}
22272225

22282226
// Prepare the response.
22292227
StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData()
@@ -2251,6 +2249,15 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
22512249
);
22522250
}
22532251

2252+
group.getShutdownRequestMemberId().ifPresent(requestingMemberId -> returnedStatus.add(
2253+
new Status()
2254+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
2255+
.setStatusDetail(
2256+
String.format("Streams group member %s encountered a fatal error and requested a shutdown for the entire application.",
2257+
requestingMemberId)
2258+
)
2259+
));
2260+
22542261
if (!returnedStatus.isEmpty()) {
22552262
response.setStatus(returnedStatus);
22562263
}
@@ -4843,8 +4850,6 @@ public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streams
48434850
request.processId(),
48444851
request.userEndpoint(),
48454852
request.clientTags(),
4846-
request.taskOffsets(),
4847-
request.taskEndOffsets(),
48484853
request.shutdownApplication()
48494854
);
48504855
}

Diff for: group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java

+26
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,14 @@ public static class DeadlineAndEpoch {
197197
*/
198198
private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
199199

200+
/**
201+
* Keeps a member ID that requested a shutdown for this group.
202+
* This has no direct effect inside the group coordinator, but is propagated to old and new members of the group.
203+
* It is cleared once the group is empty.
204+
* This is not persisted in the log, as the shutdown request is best-effort.
205+
*/
206+
private Optional<String> shutdownRequestMemberId = Optional.empty();
207+
200208
public StreamsGroup(
201209
LogContext logContext,
202210
SnapshotRegistry snapshotRegistry,
@@ -824,6 +832,7 @@ private void maybeUpdateGroupState() {
824832
StreamsGroupState newState = STABLE;
825833
if (members.isEmpty()) {
826834
newState = EMPTY;
835+
clearShutdownRequestMemberId();
827836
} else if (topology().isEmpty() || configuredTopology().isEmpty() || !configuredTopology().get().isReady()) {
828837
newState = NOT_READY;
829838
} else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
@@ -1049,4 +1058,21 @@ public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup(
10491058
return describedGroup;
10501059
}
10511060

1061+
public void setShutdownRequestMemberId(final String memberId) {
1062+
if (shutdownRequestMemberId.isEmpty()) {
1063+
log.info("[GroupId {}][MemberId {}] Shutdown requested for the streams application.", groupId, memberId);
1064+
shutdownRequestMemberId = Optional.of(memberId);
1065+
}
1066+
}
1067+
1068+
public Optional<String> getShutdownRequestMemberId() {
1069+
return shutdownRequestMemberId;
1070+
}
1071+
1072+
private void clearShutdownRequestMemberId() {
1073+
if (shutdownRequestMemberId.isPresent()) {
1074+
log.info("[GroupId {}] Clearing shutdown requested for the streams application.", groupId);
1075+
shutdownRequestMemberId = Optional.empty();
1076+
}
1077+
}
10521078
}

Diff for: group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

+92
Original file line numberDiff line numberDiff line change
@@ -16278,6 +16278,98 @@ public void testStreamsGroupMemberJoiningWithStaleTopology() {
1627816278
assertRecordsEquals(expectedRecords, result.records());
1627916279
}
1628016280

16281+
@Test
16282+
public void testStreamsGroupMemberRequestingShutdownApplication() {
16283+
String groupId = "fooup";
16284+
String memberId1 = Uuid.randomUuid().toString();
16285+
String memberId2 = Uuid.randomUuid().toString();
16286+
String subtopology1 = "subtopology1";
16287+
String fooTopicName = "foo";
16288+
Uuid fooTopicId = Uuid.randomUuid();
16289+
Topology topology = new Topology().setSubtopologies(List.of(
16290+
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
16291+
));
16292+
16293+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
16294+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
16295+
.withStreamsGroupTaskAssignors(List.of(assignor))
16296+
.withMetadataImage(new MetadataImageBuilder()
16297+
.addTopic(fooTopicId, fooTopicName, 6)
16298+
.build())
16299+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
16300+
.withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
16301+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
16302+
.setMemberEpoch(10)
16303+
.setPreviousMemberEpoch(9)
16304+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
16305+
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
16306+
.build())
16307+
.withMember(streamsGroupMemberBuilderWithDefaults(memberId2)
16308+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
16309+
.setMemberEpoch(10)
16310+
.setPreviousMemberEpoch(9)
16311+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
16312+
TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
16313+
.build())
16314+
.withTargetAssignment(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
16315+
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
16316+
.withTargetAssignment(memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
16317+
TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
16318+
.withTargetAssignmentEpoch(10)
16319+
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
16320+
.withPartitionMetadata(Map.of(
16321+
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
16322+
))
16323+
)
16324+
.build();
16325+
16326+
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result1 = context.streamsGroupHeartbeat(
16327+
new StreamsGroupHeartbeatRequestData()
16328+
.setGroupId(groupId)
16329+
.setMemberId(memberId1)
16330+
.setMemberEpoch(10)
16331+
.setShutdownApplication(true)
16332+
);
16333+
16334+
String statusDetail = String.format("Streams group member %s encountered a fatal error and requested a shutdown for the entire application.", memberId1);
16335+
16336+
assertResponseEquals(
16337+
new StreamsGroupHeartbeatResponseData()
16338+
.setMemberId(memberId1)
16339+
.setMemberEpoch(10)
16340+
.setHeartbeatIntervalMs(5000)
16341+
.setStatus(List.of(
16342+
new StreamsGroupHeartbeatResponseData.Status()
16343+
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
16344+
.setStatusDetail(statusDetail)
16345+
)),
16346+
result1.response().data()
16347+
);
16348+
assertRecordsEquals(List.of(), result1.records());
16349+
16350+
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result2 = context.streamsGroupHeartbeat(
16351+
new StreamsGroupHeartbeatRequestData()
16352+
.setGroupId(groupId)
16353+
.setMemberId(memberId2)
16354+
.setMemberEpoch(10)
16355+
);
16356+
16357+
assertResponseEquals(
16358+
new StreamsGroupHeartbeatResponseData()
16359+
.setMemberId(memberId2)
16360+
.setMemberEpoch(10)
16361+
.setHeartbeatIntervalMs(5000)
16362+
.setStatus(List.of(
16363+
new StreamsGroupHeartbeatResponseData.Status()
16364+
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
16365+
.setStatusDetail(statusDetail)
16366+
)),
16367+
result2.response().data()
16368+
);
16369+
16370+
assertRecordsEquals(List.of(), result2.records());
16371+
}
16372+
1628116373
@Test
1628216374
public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() {
1628316375
String groupId = "fooup";

Diff for: group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java

+33-1
Original file line numberDiff line numberDiff line change
@@ -1106,4 +1106,36 @@ public void testIsSubscribedToTopic() {
11061106
assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
11071107
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
11081108
}
1109-
}
1109+
1110+
@Test
1111+
public void testShutdownRequestedMethods() {
1112+
String memberId1 = "test-member-id1";
1113+
String memberId2 = "test-member-id2";
1114+
LogContext logContext = new LogContext();
1115+
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
1116+
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
1117+
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group", metricsShard);
1118+
1119+
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1));
1120+
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2));
1121+
1122+
// Initially, shutdown should not be requested
1123+
assertTrue(streamsGroup.getShutdownRequestMemberId().isEmpty());
1124+
1125+
// Set shutdown requested
1126+
streamsGroup.setShutdownRequestMemberId(memberId1);
1127+
assertEquals(Optional.of(memberId1), streamsGroup.getShutdownRequestMemberId());
1128+
1129+
// Setting shutdown requested again will be ignored
1130+
streamsGroup.setShutdownRequestMemberId(memberId2);
1131+
assertEquals(Optional.of(memberId1), streamsGroup.getShutdownRequestMemberId());
1132+
1133+
// As long as group not empty, remain in shutdown requested state
1134+
streamsGroup.removeMember(memberId1);
1135+
assertEquals(Optional.of(memberId1), streamsGroup.getShutdownRequestMemberId());
1136+
1137+
// As soon as the group is empty, clear the shutdown requested state
1138+
streamsGroup.removeMember(memberId2);
1139+
assertEquals(Optional.empty(), streamsGroup.getShutdownRequestMemberId());
1140+
}
1141+
}

0 commit comments

Comments
 (0)