Skip to content

Commit 270948b

Browse files
KAFKA-19115: Utilize initialized topics info to verify delete share group offsets (#19431)
Currently, in the deleteShareGroupOffsets method in GroupCoordinatorService, the user request was simply forwarded to the persister without checking if the requested share partitions were initialized for the group or not. This PR introduces such a check to make sure that the persister deleteState request is only called for share partitions that have been initialized for the group. Reviewers: Andrew Schofield <[email protected]>, Sushant Mahajan <[email protected]>
1 parent fc25436 commit 270948b

File tree

6 files changed

+1290
-221
lines changed

6 files changed

+1290
-221
lines changed

Diff for: group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java

+66-102
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.kafka.common.message.DeleteGroupsResponseData;
2929
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
3030
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
31-
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
3231
import org.apache.kafka.common.message.DescribeGroupsResponseData;
3332
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
3433
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@@ -1258,45 +1257,40 @@ private CompletableFuture<Map<String, Errors>> persisterDeleteToGroupIdErrorMap(
12581257
});
12591258
}
12601259

1261-
private void populateDeleteShareGroupOffsetsFuture(
1262-
DeleteShareGroupOffsetsRequestData requestData,
1263-
CompletableFuture<DeleteShareGroupOffsetsResponseData> future,
1264-
Map<Uuid, String> requestTopicIdToNameMapping,
1265-
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData,
1266-
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> deleteShareGroupOffsetsResponseTopicList
1267-
1260+
private CompletableFuture<DeleteShareGroupOffsetsResponseData> persistDeleteShareGroupOffsets(
1261+
DeleteShareGroupStateParameters deleteStateRequestParameters,
1262+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList
12681263
) {
1269-
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
1270-
.setGroupId(requestData.groupId())
1271-
.setTopics(deleteShareGroupStateRequestTopicsData);
1272-
1273-
persister.deleteState(DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData))
1274-
.whenComplete((result, error) -> {
1275-
if (error != null) {
1276-
log.error("Failed to delete share group state");
1277-
future.completeExceptionally(error);
1278-
return;
1279-
}
1264+
return persister.deleteState(deleteStateRequestParameters)
1265+
.thenCompose(result -> {
12801266
if (result == null || result.topicsData() == null) {
12811267
log.error("Result is null for the delete share group state");
1282-
future.completeExceptionally(new IllegalStateException("Result is null for the delete share group state"));
1283-
return;
1268+
Exception exception = new IllegalStateException("Result is null for the delete share group state");
1269+
return CompletableFuture.completedFuture(
1270+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(exception))
1271+
);
12841272
}
12851273
result.topicsData().forEach(topicData ->
1286-
deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
1287-
.setTopicId(topicData.topicId())
1288-
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
1289-
.setPartitions(topicData.partitions().stream().map(
1290-
partitionData -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
1291-
.setPartitionIndex(partitionData.partition())
1292-
.setErrorMessage(partitionData.errorCode() == Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message())
1293-
.setErrorCode(partitionData.errorCode())
1294-
).toList())
1295-
));
1274+
errorTopicResponseList.add(
1275+
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
1276+
.setTopicId(topicData.topicId())
1277+
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
1278+
.setPartitions(topicData.partitions().stream().map(
1279+
partitionData -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
1280+
.setPartitionIndex(partitionData.partition())
1281+
.setErrorMessage(partitionData.errorCode() == Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message())
1282+
.setErrorCode(partitionData.errorCode())
1283+
).toList())
1284+
)
1285+
);
12961286

1297-
future.complete(
1287+
return CompletableFuture.completedFuture(
12981288
new DeleteShareGroupOffsetsResponseData()
1299-
.setResponses(deleteShareGroupOffsetsResponseTopicList));
1289+
.setResponses(errorTopicResponseList)
1290+
);
1291+
}).exceptionally(throwable -> {
1292+
log.error("Failed to delete share group state");
1293+
return DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
13001294
});
13011295
}
13021296

@@ -1590,83 +1584,53 @@ public CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOf
15901584
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
15911585
}
15921586

1593-
Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
1594-
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<>(requestData.topics().size());
1595-
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> deleteShareGroupOffsetsResponseTopicList = new ArrayList<>(requestData.topics().size());
1596-
1597-
requestData.topics().forEach(topic -> {
1598-
Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName());
1599-
if (topicId != null) {
1600-
requestTopicIdToNameMapping.put(topicId, topic.topicName());
1601-
deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData()
1602-
.setTopicId(topicId)
1603-
.setPartitions(
1604-
topic.partitions().stream().map(
1605-
partitionIndex -> new DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
1606-
).toList()
1607-
));
1608-
} else {
1609-
deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
1610-
.setTopicName(topic.topicName())
1611-
.setPartitions(topic.partitions().stream().map(
1612-
partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
1613-
.setPartitionIndex(partition)
1614-
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
1615-
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
1616-
).toList()));
1617-
}
1618-
});
1619-
1620-
// If the request for the persister is empty, just complete the operation right away.
1621-
if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
1587+
if (requestData.topics() == null || requestData.topics().isEmpty()) {
16221588
return CompletableFuture.completedFuture(
16231589
new DeleteShareGroupOffsetsResponseData()
1624-
.setResponses(deleteShareGroupOffsetsResponseTopicList));
1590+
);
16251591
}
16261592

1627-
CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new CompletableFuture<>();
1593+
return runtime.scheduleReadOperation(
1594+
"share-group-delete-offsets-request",
1595+
topicPartitionFor(groupId),
1596+
(coordinator, lastCommittedOffset) -> coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)
1597+
)
1598+
.thenCompose(resultHolder -> {
1599+
if (resultHolder == null) {
1600+
log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId);
1601+
return CompletableFuture.completedFuture(
1602+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR)
1603+
);
1604+
}
1605+
1606+
if (resultHolder.topLevelErrorCode() != Errors.NONE.code()) {
1607+
return CompletableFuture.completedFuture(
1608+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(
1609+
resultHolder.topLevelErrorCode(),
1610+
resultHolder.topLevelErrorMessage()
1611+
)
1612+
);
1613+
}
16281614

1629-
TopicPartition topicPartition = topicPartitionFor(groupId);
1615+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList =
1616+
resultHolder.errorTopicResponseList() == null ? new ArrayList<>() : new ArrayList<>(resultHolder.errorTopicResponseList());
16301617

1631-
// This is done to make sure the provided group is empty. Offsets can be deleted only for an empty share group.
1632-
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> describeGroupFuture =
1633-
runtime.scheduleReadOperation(
1634-
"share-group-describe",
1635-
topicPartition,
1636-
(coordinator, lastCommittedOffset) -> coordinator.shareGroupDescribe(List.of(groupId), lastCommittedOffset)
1637-
).exceptionally(exception -> handleOperationException(
1638-
"share-group-describe",
1639-
List.of(groupId),
1640-
exception,
1641-
(error, __) -> ShareGroupDescribeRequest.getErrorDescribedGroupList(List.of(groupId), error),
1642-
log
1643-
));
1618+
if (resultHolder.deleteStateRequestParameters() == null) {
1619+
return CompletableFuture.completedFuture(
1620+
new DeleteShareGroupOffsetsResponseData()
1621+
.setResponses(errorTopicResponseList)
1622+
);
1623+
}
16441624

1645-
describeGroupFuture.whenComplete((groups, throwable) -> {
1646-
if (throwable != null) {
1647-
log.error("Failed to describe the share group {}", groupId, throwable);
1648-
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable)));
1649-
} else if (groups == null || groups.isEmpty()) {
1650-
log.error("Describe share group resulted in empty response for group {}", groupId);
1651-
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.GROUP_ID_NOT_FOUND));
1652-
} else if (groups.get(0).errorCode() != Errors.NONE.code()) {
1653-
log.error("Failed to describe the share group {}", groupId);
1654-
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(groups.get(0).errorCode(), groups.get(0).errorMessage()));
1655-
} else if (groups.get(0).members() != null && !groups.get(0).members().isEmpty()) {
1656-
log.error("Provided group {} is not empty", groupId);
1657-
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.NON_EMPTY_GROUP));
1658-
} else {
1659-
populateDeleteShareGroupOffsetsFuture(
1660-
requestData,
1661-
future,
1662-
requestTopicIdToNameMapping,
1663-
deleteShareGroupStateRequestTopicsData,
1664-
deleteShareGroupOffsetsResponseTopicList
1625+
return persistDeleteShareGroupOffsets(
1626+
resultHolder.deleteStateRequestParameters(),
1627+
errorTopicResponseList
16651628
);
1666-
}
1667-
});
1668-
1669-
return future;
1629+
})
1630+
.exceptionally(throwable -> {
1631+
log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId, throwable);
1632+
return DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
1633+
});
16701634
}
16711635

16721636
/**

Diff for: group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java

+118
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
2727
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
2828
import org.apache.kafka.common.message.DeleteGroupsResponseData;
29+
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
30+
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
31+
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
2932
import org.apache.kafka.common.message.DescribeGroupsResponseData;
3033
import org.apache.kafka.common.message.HeartbeatRequestData;
3134
import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -131,6 +134,7 @@
131134
import java.util.HashSet;
132135
import java.util.List;
133136
import java.util.Map;
137+
import java.util.Objects;
134138
import java.util.Optional;
135139
import java.util.Set;
136140
import java.util.concurrent.CompletableFuture;
@@ -290,6 +294,69 @@ public GroupCoordinatorShard build() {
290294
}
291295
}
292296

297+
public static class DeleteShareGroupOffsetsResultHolder {
298+
private final short topLevelErrorCode;
299+
private final String topLevelErrorMessage;
300+
private final List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList;
301+
private final DeleteShareGroupStateParameters deleteStateRequestParameters;
302+
303+
DeleteShareGroupOffsetsResultHolder(short topLevelErrorCode, String topLevelErrorMessage) {
304+
this(topLevelErrorCode, topLevelErrorMessage, null, null);
305+
}
306+
307+
DeleteShareGroupOffsetsResultHolder(
308+
short topLevelErrorCode,
309+
String topLevelErrorMessage,
310+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList
311+
) {
312+
this(topLevelErrorCode, topLevelErrorMessage, errorTopicResponseList, null);
313+
}
314+
315+
DeleteShareGroupOffsetsResultHolder(
316+
short topLevelErrorCode,
317+
String topLevelErrorMessage,
318+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList,
319+
DeleteShareGroupStateParameters deleteStateRequestParameters
320+
) {
321+
this.topLevelErrorCode = topLevelErrorCode;
322+
this.topLevelErrorMessage = topLevelErrorMessage;
323+
this.errorTopicResponseList = errorTopicResponseList;
324+
this.deleteStateRequestParameters = deleteStateRequestParameters;
325+
}
326+
327+
public short topLevelErrorCode() {
328+
return this.topLevelErrorCode;
329+
}
330+
331+
public String topLevelErrorMessage() {
332+
return this.topLevelErrorMessage;
333+
}
334+
335+
public List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList() {
336+
return this.errorTopicResponseList;
337+
}
338+
339+
public DeleteShareGroupStateParameters deleteStateRequestParameters() {
340+
return this.deleteStateRequestParameters;
341+
}
342+
343+
@Override
344+
public boolean equals(Object o) {
345+
if (this == o) return true;
346+
if (o == null || getClass() != o.getClass()) return false;
347+
DeleteShareGroupOffsetsResultHolder other = (DeleteShareGroupOffsetsResultHolder) o;
348+
return topLevelErrorCode == other.topLevelErrorCode &&
349+
Objects.equals(topLevelErrorMessage, other.topLevelErrorMessage) &&
350+
Objects.equals(errorTopicResponseList, other.errorTopicResponseList) &&
351+
Objects.equals(deleteStateRequestParameters, other.deleteStateRequestParameters);
352+
}
353+
354+
@Override
355+
public int hashCode() {
356+
return Objects.hash(topLevelErrorCode, topLevelErrorMessage, errorTopicResponseList, deleteStateRequestParameters);
357+
}
358+
}
359+
293360
/**
294361
* The group/offsets expiration key to schedule a timer task.
295362
*
@@ -613,6 +680,57 @@ public CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters,
613680
return new CoordinatorResult<>(records, responseMap);
614681
}
615682

683+
/**
684+
* Does the following checks to make sure that a DeleteShareGroupOffsets request is valid and can be processed further
685+
* 1. Checks whether the provided group is empty
686+
* 2. Checks the requested topics are presented in the metadataImage
687+
* 3. Checks the requested share partitions are initialized for the group
688+
*
689+
* @param groupId - The group ID
690+
* @param requestData - The request data for DeleteShareGroupOffsetsRequest
691+
* @return {@link DeleteShareGroupOffsetsResultHolder} an object containing top level error code, list of topic responses
692+
* and persister deleteState request parameters
693+
*/
694+
public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest(
695+
String groupId,
696+
DeleteShareGroupOffsetsRequestData requestData
697+
) {
698+
try {
699+
ShareGroup group = groupMetadataManager.shareGroup(groupId);
700+
group.validateDeleteGroup();
701+
702+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
703+
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData =
704+
groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(
705+
groupId,
706+
requestData,
707+
errorTopicResponseList
708+
);
709+
710+
if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
711+
return new DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, errorTopicResponseList);
712+
}
713+
714+
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
715+
.setGroupId(requestData.groupId())
716+
.setTopics(deleteShareGroupStateRequestTopicsData);
717+
718+
return new DeleteShareGroupOffsetsResultHolder(
719+
Errors.NONE.code(),
720+
null,
721+
errorTopicResponseList,
722+
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
723+
);
724+
725+
} catch (GroupIdNotFoundException exception) {
726+
log.error("groupId {} not found", groupId, exception);
727+
return new DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(), exception.getMessage());
728+
} catch (GroupNotEmptyException exception) {
729+
log.error("Provided group {} is not empty", groupId);
730+
return new DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(), exception.getMessage());
731+
}
732+
}
733+
616734
/**
617735
* Fetch offsets for a given set of partitions and a given group.
618736
*

0 commit comments

Comments
 (0)