Skip to content

Commit ebb6281

Browse files
authored
KAFKA-19074 Remove the cached responseData from ShareFetchResponse (#19357)
Jira: https://issues.apache.org/jira/browse/KAFKA-19074 Similar fix #16532 2b8aff5 make it accept input to return "partial" data. The content of output is based on the input but we cache the output ... It will return same output even though we pass different input. That is a potential bug. Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 08a93fe commit ebb6281

File tree

2 files changed

+54
-18
lines changed

2 files changed

+54
-18
lines changed

clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java

+7-18
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ public class ShareFetchResponse extends AbstractResponse {
5555

5656
private final ShareFetchResponseData data;
5757

58-
private volatile LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData = null;
59-
6058
public ShareFetchResponse(ShareFetchResponseData data) {
6159
super(ApiKeys.SHARE_FETCH);
6260
this.data = data;
@@ -84,23 +82,14 @@ public Map<Errors, Integer> errorCounts() {
8482
}
8583

8684
public LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData(Map<Uuid, String> topicNames) {
87-
if (responseData == null) {
88-
synchronized (this) {
89-
// Assigning the lazy-initialized `responseData` in the last step
90-
// to avoid other threads accessing a half-initialized object.
91-
if (responseData == null) {
92-
final LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseDataTmp = new LinkedHashMap<>();
93-
data.responses().forEach(topicResponse -> {
94-
String name = topicNames.get(topicResponse.topicId());
95-
if (name != null) {
96-
topicResponse.partitions().forEach(partitionData -> responseDataTmp.put(new TopicIdPartition(topicResponse.topicId(),
97-
new TopicPartition(name, partitionData.partitionIndex())), partitionData));
98-
}
99-
});
100-
responseData = responseDataTmp;
101-
}
85+
final LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData = new LinkedHashMap<>();
86+
data.responses().forEach(topicResponse -> {
87+
String name = topicNames.get(topicResponse.topicId());
88+
if (name != null) {
89+
topicResponse.partitions().forEach(partitionData -> responseData.put(new TopicIdPartition(topicResponse.topicId(),
90+
new TopicPartition(name, partitionData.partitionIndex())), partitionData));
10291
}
103-
}
92+
});
10493
return responseData;
10594
}
10695

server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java

+47
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,23 @@
1919
import org.apache.kafka.common.TopicIdPartition;
2020
import org.apache.kafka.common.TopicPartition;
2121
import org.apache.kafka.common.Uuid;
22+
import org.apache.kafka.common.compress.Compression;
23+
import org.apache.kafka.common.message.ShareFetchResponseData;
2224
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
25+
import org.apache.kafka.common.protocol.Errors;
26+
import org.apache.kafka.common.record.MemoryRecords;
27+
import org.apache.kafka.common.record.MemoryRecordsBuilder;
28+
import org.apache.kafka.common.record.TimestampType;
29+
import org.apache.kafka.common.requests.ShareFetchResponse;
2330
import org.apache.kafka.server.storage.log.FetchParams;
2431
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
2532

2633
import org.junit.jupiter.api.AfterEach;
2734
import org.junit.jupiter.api.BeforeEach;
2835
import org.junit.jupiter.api.Test;
2936

37+
import java.nio.ByteBuffer;
38+
import java.util.LinkedHashMap;
3039
import java.util.List;
3140
import java.util.Map;
3241
import java.util.Set;
@@ -66,6 +75,21 @@ public void testErrorInAllPartitions() {
6675
assertTrue(shareFetch.errorInAllPartitions());
6776
}
6877

78+
@Test
79+
public void testDontCacheAnyData() {
80+
final TopicIdPartition tidp = new TopicIdPartition(Uuid.randomUuid(), 0, "topic");
81+
MemoryRecords records = buildRecords(1L, 3, 1);
82+
83+
ShareFetchResponse shareFetch = shareFetchResponse(tidp, records, Errors.NONE, "", (short) 0,
84+
"", List.of(), 0);
85+
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData = shareFetch.responseData(Map.of(tidp.topicId(), tidp.topic()));
86+
assertEquals(1, responseData.size());
87+
responseData.forEach((topicIdPartition, partitionData) -> assertEquals(records, partitionData.records()));
88+
89+
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> nonResponseData = shareFetch.responseData(Map.of());
90+
assertEquals(0, nonResponseData.size());
91+
}
92+
6993
@Test
7094
public void testErrorInAllPartitionsWithMultipleTopicIdPartitions() {
7195
TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
@@ -201,4 +225,27 @@ public void testMaybeCompleteWithExceptionWithExistingErroneousTopicPartition()
201225
assertEquals(1, brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().count());
202226
assertEquals(1, brokerTopicStats.topicStats("foo").failedShareFetchRequestRate().count());
203227
}
228+
229+
private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) {
230+
MemoryRecordsBuilder builder = MemoryRecords.builder(
231+
ByteBuffer.allocate(1024), Compression.NONE, TimestampType.CREATE_TIME, baseOffset);
232+
for (int i = 0; i < count; i++)
233+
builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + i)).getBytes());
234+
return builder.build();
235+
}
236+
237+
private ShareFetchResponse shareFetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error,
238+
String errorMessage, short acknowledgeErrorCode, String acknowledgeErrorMessage,
239+
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords, int throttleTime) {
240+
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions = Map.of(tp,
241+
new ShareFetchResponseData.PartitionData()
242+
.setPartitionIndex(tp.topicPartition().partition())
243+
.setErrorCode(error.code())
244+
.setErrorMessage(errorMessage)
245+
.setAcknowledgeErrorCode(acknowledgeErrorCode)
246+
.setAcknowledgeErrorMessage(acknowledgeErrorMessage)
247+
.setRecords(records)
248+
.setAcquiredRecords(acquiredRecords));
249+
return ShareFetchResponse.of(Errors.NONE, throttleTime, new LinkedHashMap<>(partitions), List.of(), Integer.MAX_VALUE);
250+
}
204251
}

0 commit comments

Comments
 (0)