Skip to content

Commit b649b1e

Browse files
authored
KAFKA-18935: Ensure brokers do not return null records in FetchResponse (#19167)
JIRA: KAFKA-18935 This patch ensures the broker will not return null records in FetchResponse. For more details, please refer to the ticket. Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
1 parent 6430fb5 commit b649b1e

File tree

15 files changed

+176
-112
lines changed

15 files changed

+176
-112
lines changed

clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
360360
.setPartitions(partitionResponses));
361361
});
362362
}
363-
return new FetchResponse(new FetchResponseData()
363+
return FetchResponse.of(new FetchResponseData()
364364
.setThrottleTimeMs(throttleTimeMs)
365365
.setErrorCode(error.code())
366366
.setSessionId(data.sessionId())

clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public FetchResponseData data() {
8787
* We may also return INCONSISTENT_TOPIC_ID error as a partition-level error when a partition in the session has a topic ID
8888
* inconsistent with the log.
8989
*/
90-
public FetchResponse(FetchResponseData fetchResponseData) {
90+
private FetchResponse(FetchResponseData fetchResponseData) {
9191
super(ApiKeys.FETCH);
9292
this.data = fetchResponseData;
9393
}
@@ -138,6 +138,13 @@ public Map<Errors, Integer> errorCounts() {
138138
return errorCounts;
139139
}
140140

141+
/**
142+
* Creates a {@link org.apache.kafka.common.requests.FetchResponse} from the given byte buffer.
143+
* Unlike {@link org.apache.kafka.common.requests.FetchResponse#of(FetchResponseData)}, this method doesn't convert
144+
* null records to {@link org.apache.kafka.common.record.MemoryRecords#EMPTY}.
145+
*
146+
* <p><strong>This method should only be used in client-side.</strong></p>
147+
*/
141148
public static FetchResponse parse(ByteBuffer buffer, short version) {
142149
return new FetchResponse(new FetchResponseData(new ByteBufferAccessor(buffer), version));
143150
}
@@ -220,6 +227,23 @@ public static int recordsSize(FetchResponseData.PartitionData partition) {
220227
return partition.records() == null ? 0 : partition.records().sizeInBytes();
221228
}
222229

230+
/**
231+
* Creates a {@link org.apache.kafka.common.requests.FetchResponse} from the given data.
232+
* This method converts null records to {@link org.apache.kafka.common.record.MemoryRecords#EMPTY}
233+
* to ensure consistent record representation in the response.
234+
*
235+
* <p><strong>This method should only be used in server-side.</strong></p>
236+
*/
237+
public static FetchResponse of(FetchResponseData data) {
238+
for (FetchResponseData.FetchableTopicResponse response : data.responses()) {
239+
for (FetchResponseData.PartitionData partition : response.partitions()) {
240+
if (partition.records() == null)
241+
partition.setRecords(MemoryRecords.EMPTY);
242+
}
243+
}
244+
return new FetchResponse(data);
245+
}
246+
223247
// TODO: remove as a part of KAFKA-12410
224248
public static FetchResponse of(Errors error,
225249
int throttleTimeMs,
@@ -258,6 +282,11 @@ private static FetchResponseData toMessage(Errors error,
258282
FetchResponseData.PartitionData partitionData = entry.getValue();
259283
// Since PartitionData alone doesn't know the partition ID, we set it here
260284
partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
285+
// To protect the clients from failing due to null records,
286+
// we always convert null records to MemoryRecords.EMPTY
287+
// We will propose a KIP to change the schema definitions in the future
288+
if (partitionData.records() == null)
289+
partitionData.setRecords(MemoryRecords.EMPTY);
261290
// We have to keep the order of input topic-partition. Hence, we batch the partitions only if the last
262291
// batch is in the same topic group.
263292
FetchResponseData.FetchableTopicResponse previousTopic = topicResponseList.isEmpty() ? null

clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
import org.apache.kafka.common.TopicPartition;
2121
import org.apache.kafka.common.Uuid;
2222
import org.apache.kafka.common.message.ShareFetchRequestData;
23-
import org.apache.kafka.common.message.ShareFetchResponseData;
2423
import org.apache.kafka.common.protocol.ApiKeys;
2524
import org.apache.kafka.common.protocol.Errors;
2625
import org.apache.kafka.common.protocol.Readable;
2726

2827
import java.util.ArrayList;
2928
import java.util.HashMap;
29+
import java.util.LinkedHashMap;
3030
import java.util.List;
3131
import java.util.Map;
3232

@@ -161,9 +161,7 @@ public ShareFetchRequestData data() {
161161
@Override
162162
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
163163
Errors error = Errors.forException(e);
164-
return new ShareFetchResponse(new ShareFetchResponseData()
165-
.setThrottleTimeMs(throttleTimeMs)
166-
.setErrorCode(error.code()));
164+
return ShareFetchResponse.of(error, throttleTimeMs, new LinkedHashMap<>(), List.of(), 0);
167165
}
168166

169167
public static ShareFetchRequest parse(Readable readable, short version) {

clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java

+23-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class ShareFetchResponse extends AbstractResponse {
5555

5656
private final ShareFetchResponseData data;
5757

58-
public ShareFetchResponse(ShareFetchResponseData data) {
58+
private ShareFetchResponse(ShareFetchResponseData data) {
5959
super(ApiKeys.SHARE_FETCH);
6060
this.data = data;
6161
}
@@ -103,6 +103,13 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
103103
data.setThrottleTimeMs(throttleTimeMs);
104104
}
105105

106+
/**
107+
* Creates a {@link org.apache.kafka.common.requests.ShareFetchResponse} from the given byte buffer.
108+
* Unlike {@link org.apache.kafka.common.requests.ShareFetchResponse#of(Errors, int, LinkedHashMap, List, int)},
109+
* this method doesn't convert null records to {@link org.apache.kafka.common.record.MemoryRecords#EMPTY}.
110+
*
111+
* <p><strong>This method should only be used in client-side.</strong></p>
112+
*/
106113
public static ShareFetchResponse parse(ByteBuffer buffer, short version) {
107114
return new ShareFetchResponse(
108115
new ShareFetchResponseData(new ByteBufferAccessor(buffer), version)
@@ -145,14 +152,21 @@ public static int recordsSize(ShareFetchResponseData.PartitionData partition) {
145152
return partition.records() == null ? 0 : partition.records().sizeInBytes();
146153
}
147154

155+
/**
156+
* Creates a {@link org.apache.kafka.common.requests.ShareFetchResponse} from the given data.
157+
* This method converts null records to {@link org.apache.kafka.common.record.MemoryRecords#EMPTY}
158+
* to ensure consistent record representation in the response.
159+
*
160+
* <p><strong>This method should only be used in server-side.</strong></p>
161+
*/
148162
public static ShareFetchResponse of(Errors error,
149163
int throttleTimeMs,
150164
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData,
151165
List<Node> nodeEndpoints, int acquisitionLockTimeout) {
152166
return new ShareFetchResponse(toMessage(error, throttleTimeMs, responseData.entrySet().iterator(), nodeEndpoints, acquisitionLockTimeout));
153167
}
154168

155-
public static ShareFetchResponseData toMessage(Errors error, int throttleTimeMs,
169+
private static ShareFetchResponseData toMessage(Errors error, int throttleTimeMs,
156170
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> partIterator,
157171
List<Node> nodeEndpoints, int acquisitionLockTimeout) {
158172
Map<Uuid, ShareFetchResponseData.ShareFetchableTopicResponse> topicResponseList = new LinkedHashMap<>();
@@ -161,6 +175,11 @@ public static ShareFetchResponseData toMessage(Errors error, int throttleTimeMs,
161175
ShareFetchResponseData.PartitionData partitionData = entry.getValue();
162176
// Since PartitionData alone doesn't know the partition ID, we set it here
163177
partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
178+
// To protect the clients from failing due to null records,
179+
// we always convert null records to MemoryRecords.EMPTY
180+
// We will propose a KIP to change the schema definitions in the future
181+
if (partitionData.records() == null)
182+
partitionData.setRecords(MemoryRecords.EMPTY);
164183
// Checking if the topic is already present in the map
165184
if (topicResponseList.containsKey(entry.getKey().topicId())) {
166185
topicResponseList.get(entry.getKey().topicId()).partitions().add(partitionData);
@@ -193,6 +212,7 @@ public static ShareFetchResponseData.PartitionData partitionResponse(TopicIdPart
193212
public static ShareFetchResponseData.PartitionData partitionResponse(int partition, Errors error) {
194213
return new ShareFetchResponseData.PartitionData()
195214
.setPartitionIndex(partition)
196-
.setErrorCode(error.code());
215+
.setErrorCode(error.code())
216+
.setRecords(MemoryRecords.EMPTY);
197217
}
198218
}

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.nio.ByteBuffer;
5757
import java.time.Duration;
5858
import java.util.HashMap;
59+
import java.util.LinkedHashMap;
5960
import java.util.LinkedList;
6061
import java.util.List;
6162
import java.util.Map;
@@ -392,13 +393,7 @@ private ShareFetchResponse shareFetchResponse(TopicIdPartition tip, int count) {
392393
.setPartitionIndex(tip.partition())
393394
.setRecords(records)
394395
.setAcquiredRecords(List.of(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(count - 1).setDeliveryCount((short) 1)));
395-
ShareFetchResponseData.ShareFetchableTopicResponse topicResponse = new ShareFetchResponseData.ShareFetchableTopicResponse()
396-
.setTopicId(tip.topicId())
397-
.setPartitions(List.of(partData));
398-
return new ShareFetchResponse(
399-
new ShareFetchResponseData()
400-
.setResponses(List.of(topicResponse))
401-
);
396+
return ShareFetchResponse.of(Errors.NONE, 0, new LinkedHashMap<>(Map.of(tip, partData)), List.of(), 0);
402397
}
403398

404399
private ShareAcknowledgeResponse shareAcknowledgeResponse() {

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java

+47-63
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,14 @@ private static final class RespEntry {
139139
}
140140
}
141141

142-
private static List<ShareFetchResponseData.ShareFetchableTopicResponse> respList(RespEntry... entries) {
143-
HashMap<TopicIdPartition, ShareFetchResponseData.ShareFetchableTopicResponse> map = new HashMap<>();
142+
private static LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> buildResponseData(RespEntry... entries) {
143+
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> topicIdPartitionToPartition = new LinkedHashMap<>();
144144
for (RespEntry entry : entries) {
145-
ShareFetchResponseData.ShareFetchableTopicResponse response = map.computeIfAbsent(entry.part, topicIdPartition ->
146-
new ShareFetchResponseData.ShareFetchableTopicResponse().setTopicId(topicIdPartition.topicId()));
147-
response.partitions().add(new ShareFetchResponseData.PartitionData()
148-
.setPartitionIndex(entry.part.partition()));
145+
ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData()
146+
.setPartitionIndex(entry.part.partition());
147+
topicIdPartitionToPartition.put(entry.part, partitionData);
149148
}
150-
return new ArrayList<>(map.values());
149+
return topicIdPartitionToPartition;
151150
}
152151

153152
@Test
@@ -170,13 +169,11 @@ public void testShareSession() {
170169
assertListEquals(expectedToSend1, reqFetchList(requestData1, topicNames));
171170
assertEquals(memberId.toString(), requestData1.memberId());
172171

173-
ShareFetchResponse resp = new ShareFetchResponse(
174-
new ShareFetchResponseData()
175-
.setErrorCode(Errors.NONE.code())
176-
.setThrottleTimeMs(0)
177-
.setResponses(respList(
178-
new RespEntry("foo", 0, fooId),
179-
new RespEntry("foo", 1, fooId))));
172+
ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
173+
0,
174+
buildResponseData(new RespEntry("foo", 0, fooId), new RespEntry("foo", 1, fooId)),
175+
List.of(),
176+
0);
180177
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
181178

182179
// Test a fetch request which adds one partition
@@ -194,18 +191,15 @@ public void testShareSession() {
194191
expectedToSend2.add(new TopicIdPartition(barId, 0, "bar"));
195192
assertListEquals(expectedToSend2, reqFetchList(requestData2, topicNames));
196193

197-
ShareFetchResponse resp2 = new ShareFetchResponse(
198-
new ShareFetchResponseData()
199-
.setErrorCode(Errors.NONE.code())
200-
.setThrottleTimeMs(0)
201-
.setResponses(respList(
202-
new RespEntry("foo", 1, fooId))));
194+
ShareFetchResponse resp2 = ShareFetchResponse.of(Errors.NONE,
195+
0,
196+
buildResponseData(new RespEntry("foo", 1, fooId)),
197+
List.of(),
198+
0);
203199
handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
204200

205201
// A top-level error code will reset the session epoch
206-
ShareFetchResponse resp3 = new ShareFetchResponse(
207-
new ShareFetchResponseData()
208-
.setErrorCode(Errors.INVALID_SHARE_SESSION_EPOCH.code()));
202+
ShareFetchResponse resp3 = ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new LinkedHashMap<>(), List.of(), 0);
209203
handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion(true));
210204

211205
ShareFetchRequestData requestData4 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
@@ -251,14 +245,14 @@ public void testPartitionRemoval() {
251245
assertListEquals(expectedToSend1, reqFetchList(requestData1, topicNames));
252246
assertEquals(memberId.toString(), requestData1.memberId());
253247

254-
ShareFetchResponse resp = new ShareFetchResponse(
255-
new ShareFetchResponseData()
256-
.setErrorCode(Errors.NONE.code())
257-
.setThrottleTimeMs(0)
258-
.setResponses(respList(
259-
new RespEntry("foo", 0, fooId),
260-
new RespEntry("foo", 1, fooId),
261-
new RespEntry("bar", 0, barId))));
248+
ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
249+
0,
250+
buildResponseData(
251+
new RespEntry("foo", 0, fooId),
252+
new RespEntry("foo", 1, fooId),
253+
new RespEntry("bar", 0, barId)),
254+
List.of(),
255+
0);
262256
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
263257

264258
// Test a fetch request which removes two partitions
@@ -275,9 +269,7 @@ public void testPartitionRemoval() {
275269
assertListEquals(expectedToForget2, reqForgetList(requestData2, topicNames));
276270

277271
// A top-level error code will reset the session epoch
278-
ShareFetchResponse resp2 = new ShareFetchResponse(
279-
new ShareFetchResponseData()
280-
.setErrorCode(Errors.INVALID_SHARE_SESSION_EPOCH.code()));
272+
ShareFetchResponse resp2 = ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new LinkedHashMap<>(), List.of(), 0);
281273
handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
282274

283275
handler.addPartitionToFetch(foo1, null);
@@ -309,12 +301,11 @@ public void testTopicIdReplaced() {
309301
expectedToSend1.add(new TopicIdPartition(topicId1, 0, "foo"));
310302
assertListEquals(expectedToSend1, reqFetchList(requestData1, topicNames));
311303

312-
ShareFetchResponse resp = new ShareFetchResponse(
313-
new ShareFetchResponseData()
314-
.setErrorCode(Errors.NONE.code())
315-
.setThrottleTimeMs(0)
316-
.setResponses(respList(
317-
new RespEntry("foo", 0, topicId1))));
304+
ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
305+
0,
306+
buildResponseData(new RespEntry("foo", 0, topicId1)),
307+
List.of(),
308+
0);
318309
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
319310

320311
// Try to add a new topic ID
@@ -354,12 +345,11 @@ public void testPartitionForgottenOnAcknowledgeOnly() {
354345
expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
355346
assertListEquals(expectedToSend1, reqFetchList(requestData1, topicNames));
356347

357-
ShareFetchResponse resp = new ShareFetchResponse(
358-
new ShareFetchResponseData()
359-
.setErrorCode(Errors.NONE.code())
360-
.setThrottleTimeMs(0)
361-
.setResponses(respList(
362-
new RespEntry("foo", 0, topicId))));
348+
ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
349+
0,
350+
buildResponseData(new RespEntry("foo", 0, topicId)),
351+
List.of(),
352+
0);
363353
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
364354

365355
// Remove the topic from the session by setting acknowledgements only - this is not asking to fetch records
@@ -390,12 +380,11 @@ public void testForgottenPartitions() {
390380
expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
391381
assertListEquals(expectedToSend1, reqFetchList(requestData1, topicNames));
392382

393-
ShareFetchResponse resp = new ShareFetchResponse(
394-
new ShareFetchResponseData()
395-
.setErrorCode(Errors.NONE.code())
396-
.setThrottleTimeMs(0)
397-
.setResponses(respList(
398-
new RespEntry("foo", 0, topicId))));
383+
ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
384+
0,
385+
buildResponseData(new RespEntry("foo", 0, topicId)),
386+
List.of(),
387+
0);
399388
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
400389

401390
// Remove the topic from the session
@@ -424,23 +413,18 @@ public void testAddNewIdAfterTopicRemovedFromSession() {
424413
expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
425414
assertListEquals(expectedToSend1, reqFetchList(requestData1, topicNames));
426415

427-
ShareFetchResponse resp = new ShareFetchResponse(
428-
new ShareFetchResponseData()
429-
.setErrorCode(Errors.NONE.code())
430-
.setThrottleTimeMs(0)
431-
.setResponses(respList(
432-
new RespEntry("foo", 0, topicId))));
416+
ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
417+
0,
418+
buildResponseData(new RespEntry("foo", 0, topicId)),
419+
List.of(),
420+
0);
433421
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
434422

435423
// Remove the partition from the session
436424
ShareFetchRequestData requestData2 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
437425
assertTrue(handler.sessionPartitionMap().isEmpty());
438426
assertTrue(requestData2.topics().isEmpty());
439-
ShareFetchResponse resp2 = new ShareFetchResponse(
440-
new ShareFetchResponseData()
441-
.setErrorCode(Errors.NONE.code())
442-
.setThrottleTimeMs(0)
443-
.setResponses(respList()));
427+
ShareFetchResponse resp2 = ShareFetchResponse.of(Errors.NONE, 0, new LinkedHashMap<>(), List.of(), 0);
444428
handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
445429

446430
// After the topic is removed, add a recreated topic with a new ID

0 commit comments

Comments
 (0)