Skip to content

Commit 4aa8120

Browse files
KAFKA-19018,KAFKA-19063: Implement maxRecords and acquisition lock timeout in share fetch request and response resp. (#19334)
PR add `MaxRecords` to share fetch request and also adds `AcquisitionLockTimeout` to share fetch response. PR also removes internal broker config of `max.fetch.records`. Reviewers: Andrew Schofield <[email protected]>
1 parent e380968 commit 4aa8120

File tree

21 files changed

+334
-147
lines changed

21 files changed

+334
-147
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, FetchConfi
180180
return ShareFetchRequest.Builder.forConsumer(
181181
groupId, nextMetadata, fetchConfig.maxWaitMs,
182182
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.maxPollRecords,
183-
added, removed, acknowledgementBatches);
183+
fetchConfig.maxPollRecords, added, removed, acknowledgementBatches);
184184
}
185185

186186
public ShareAcknowledgeRequest.Builder newShareAcknowledgeBuilder(String groupId, FetchConfig fetchConfig) {

clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public Builder(ShareFetchRequestData data, boolean enableUnstableLastVersion) {
4646
}
4747

4848
public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
49-
int maxWait, int minBytes, int maxBytes, int batchSize,
50-
List<TopicIdPartition> send, List<TopicIdPartition> forget,
49+
int maxWait, int minBytes, int maxBytes, int maxRecords,
50+
int batchSize, List<TopicIdPartition> send, List<TopicIdPartition> forget,
5151
Map<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
5252
ShareFetchRequestData data = new ShareFetchRequestData();
5353
data.setGroupId(groupId);
@@ -62,6 +62,7 @@ public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
6262
data.setMaxWaitMs(maxWait);
6363
data.setMinBytes(minBytes);
6464
data.setMaxBytes(maxBytes);
65+
data.setMaxRecords(maxRecords);
6566
data.setBatchSize(batchSize);
6667

6768
// Build a map of topics to fetch keyed by topic ID, and within each a map of partitions keyed by index

clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public static int sizeOf(short version,
144144
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> partIterator) {
145145
// Since the throttleTimeMs and metadata field sizes are constant and fixed, we can
146146
// use arbitrary values here without affecting the result.
147-
ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, Collections.emptyList());
147+
ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, Collections.emptyList(), 0);
148148
ObjectSerializationCache cache = new ObjectSerializationCache();
149149
return 4 + data.size(cache, version);
150150
}
@@ -159,13 +159,13 @@ public static int recordsSize(ShareFetchResponseData.PartitionData partition) {
159159
public static ShareFetchResponse of(Errors error,
160160
int throttleTimeMs,
161161
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData,
162-
List<Node> nodeEndpoints) {
163-
return new ShareFetchResponse(toMessage(error, throttleTimeMs, responseData.entrySet().iterator(), nodeEndpoints));
162+
List<Node> nodeEndpoints, int acquisitionLockTimeout) {
163+
return new ShareFetchResponse(toMessage(error, throttleTimeMs, responseData.entrySet().iterator(), nodeEndpoints, acquisitionLockTimeout));
164164
}
165165

166166
public static ShareFetchResponseData toMessage(Errors error, int throttleTimeMs,
167167
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> partIterator,
168-
List<Node> nodeEndpoints) {
168+
List<Node> nodeEndpoints, int acquisitionLockTimeout) {
169169
Map<Uuid, ShareFetchResponseData.ShareFetchableTopicResponse> topicResponseList = new LinkedHashMap<>();
170170
while (partIterator.hasNext()) {
171171
Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData> entry = partIterator.next();
@@ -193,6 +193,7 @@ public static ShareFetchResponseData toMessage(Errors error, int throttleTimeMs,
193193
.setRack(endpoint.rack())));
194194
return data.setThrottleTimeMs(throttleTimeMs)
195195
.setErrorCode(error.code())
196+
.setAcquisitionLockTimeoutMs(acquisitionLockTimeout)
196197
.setResponses(new ArrayList<>(topicResponseList.values()));
197198
}
198199

clients/src/main/resources/common/message/ShareFetchRequest.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
{ "name": "MinBytes", "type": "int32", "versions": "0+",
3737
"about": "The minimum bytes to accumulate in the response." },
3838
{ "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
39-
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
39+
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
40+
{ "name": "MaxRecords", "type": "int32", "versions": "0+",
41+
"about": "The maximum number of records to fetch. This limit can be exceeded for alignment of batch boundaries." },
4042
{ "name": "BatchSize", "type": "int32", "versions": "0+",
4143
"about": "The optimal number of records for batches of acquired records and acknowledgements." },
4244
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",

clients/src/main/resources/common/message/ShareFetchResponse.json

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
"about": "The top-level response error code." },
4040
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
4141
"about": "The top-level error message, or null if there was no error." },
42+
{ "name": "AcquisitionLockTimeoutMs", "type": "int32", "versions": "0+",
43+
"about": "The time in milliseconds for which the acquired records are locked." },
4244
{ "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
4345
"about": "The response topics.", "fields": [
4446
{ "name": "TopicId", "type": "uuid", "versions": "0+",

0 commit comments

Comments
 (0)