Skip to content

KAFKA-19018,KAFKA-19063: Implement maxRecords and acquisition lock timeout in share fetch request and response resp. #19334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, FetchConfi
return ShareFetchRequest.Builder.forConsumer(
groupId, nextMetadata, fetchConfig.maxWaitMs,
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.maxPollRecords,
added, removed, acknowledgementBatches);
fetchConfig.maxPollRecords, added, removed, acknowledgementBatches);
}

public ShareAcknowledgeRequest.Builder newShareAcknowledgeBuilder(String groupId, FetchConfig fetchConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public Builder(ShareFetchRequestData data, boolean enableUnstableLastVersion) {
}

public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
int maxWait, int minBytes, int maxBytes, int batchSize,
List<TopicIdPartition> send, List<TopicIdPartition> forget,
int maxWait, int minBytes, int maxBytes, int maxRecords,
int batchSize, List<TopicIdPartition> send, List<TopicIdPartition> forget,
Map<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareFetchRequestData data = new ShareFetchRequestData();
data.setGroupId(groupId);
Expand All @@ -62,6 +62,7 @@ public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
data.setMaxWaitMs(maxWait);
data.setMinBytes(minBytes);
data.setMaxBytes(maxBytes);
data.setMaxRecords(maxRecords);
data.setBatchSize(batchSize);

// Build a map of topics to fetch keyed by topic ID, and within each a map of partitions keyed by index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public static int sizeOf(short version,
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> partIterator) {
// Since the throttleTimeMs and metadata field sizes are constant and fixed, we can
// use arbitrary values here without affecting the result.
ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, Collections.emptyList());
ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, Collections.emptyList(), 0);
ObjectSerializationCache cache = new ObjectSerializationCache();
return 4 + data.size(cache, version);
}
Expand All @@ -159,13 +159,13 @@ public static int recordsSize(ShareFetchResponseData.PartitionData partition) {
public static ShareFetchResponse of(Errors error,
int throttleTimeMs,
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData,
List<Node> nodeEndpoints) {
return new ShareFetchResponse(toMessage(error, throttleTimeMs, responseData.entrySet().iterator(), nodeEndpoints));
List<Node> nodeEndpoints, int acquisitionLockTimeout) {
return new ShareFetchResponse(toMessage(error, throttleTimeMs, responseData.entrySet().iterator(), nodeEndpoints, acquisitionLockTimeout));
}

public static ShareFetchResponseData toMessage(Errors error, int throttleTimeMs,
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> partIterator,
List<Node> nodeEndpoints) {
List<Node> nodeEndpoints, int acquisitionLockTimeout) {
Map<Uuid, ShareFetchResponseData.ShareFetchableTopicResponse> topicResponseList = new LinkedHashMap<>();
while (partIterator.hasNext()) {
Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData> entry = partIterator.next();
Expand Down Expand Up @@ -193,6 +193,7 @@ public static ShareFetchResponseData toMessage(Errors error, int throttleTimeMs,
.setRack(endpoint.rack())));
return data.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code())
.setAcquisitionLockTimeoutMs(acquisitionLockTimeout)
.setResponses(new ArrayList<>(topicResponseList.values()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
{ "name": "MinBytes", "type": "int32", "versions": "0+",
"about": "The minimum bytes to accumulate in the response." },
{ "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
{ "name": "MaxRecords", "type": "int32", "versions": "0+",
"about": "The maximum number of records to fetch. This limit can be exceeded for alignment of batch boundaries." },
{ "name": "BatchSize", "type": "int32", "versions": "0+",
"about": "The optimal number of records for batches of acquired records and acknowledgements." },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
"about": "The top-level response error code." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "AcquisitionLockTimeoutMs", "type": "int32", "versions": "0+",
"about": "The time in milliseconds for which the acquired records are locked." },
{ "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
"about": "The response topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
Expand Down
Loading