Skip to content

Commit 9f0c756

Browse files
ShivsundarRjanchilling
authored andcommitted
MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. (apache#19295)
Currently if we received just a control record in the `ShareFetchResponse`, then the currentFetch in `ShareConsumerImpl` would not be updated as the record is ignored. But in the process, we lose the acknowledgment for this control record which is a GAP. PR fixes this by adding an additional map for control record acknowledgements in `ShareFetchEvent`. This updates both the ShareConsumerImpl and ShareConsumeRequestManager to accommodate the additional map. Added a unit test in `ShareConsumerImplTest` and `ShareConsumeRequestManagerTest` to verify the changes. Reviewers: Andrew Schofield <[email protected]>
1 parent 17be27e commit 9f0c756

File tree

6 files changed

+165
-241
lines changed

6 files changed

+165
-241
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,19 @@ public PollResult poll(long currentTimeMs) {
256256
return new PollResult(requests);
257257
}
258258

259-
public void fetch(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
259+
public void fetch(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
260+
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements) {
260261
if (!fetchMoreRecords) {
261262
log.debug("Fetch more data");
262263
fetchMoreRecords = true;
263264
}
264265

265-
// The acknowledgements sent via ShareFetch are stored in this map.
266+
// Process both acknowledgement maps and sends them in the next ShareFetch.
267+
processAcknowledgementsMap(acknowledgementsMap);
268+
processAcknowledgementsMap(controlRecordAcknowledgements);
269+
}
270+
271+
private void processAcknowledgementsMap(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
266272
acknowledgementsMap.forEach((tip, nodeAcks) -> {
267273
int nodeId = nodeAcks.nodeId();
268274
Map<TopicIdPartition, Acknowledgements> currentNodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(nodeId);
@@ -1474,4 +1480,8 @@ public String toString() {
14741480
return super.toString().toLowerCase(Locale.ROOT);
14751481
}
14761482
}
1483+
1484+
Map<TopicIdPartition, Acknowledgements> getFetchAcknowledgementsToSend(Integer nodeId) {
1485+
return fetchAcknowledgementsToSend.get(nodeId);
1486+
}
14771487
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -649,8 +649,8 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> ack
649649
if (currentFetch.isEmpty()) {
650650
final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
651651
if (fetch.isEmpty()) {
652-
// Fetch more records and send any waiting acknowledgements
653-
applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap));
652+
// Check for any acknowledgements which could have come from control records (GAP) and include them.
653+
applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap, fetch.takeAcknowledgedRecords()));
654654

655655
// Notify the network thread to wake up and start the next round of fetching
656656
applicationEventHandler.wakeupNetworkThread();

clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ private void process(@SuppressWarnings("unused") final StopFindCoordinatorOnClos
482482
* Process event that tells the share consume request manager to fetch more records.
483483
*/
484484
private void process(final ShareFetchEvent event) {
485-
requestManagers.shareConsumeRequestManager.ifPresent(scrm -> scrm.fetch(event.acknowledgementsMap()));
485+
requestManagers.shareConsumeRequestManager.ifPresent(scrm -> scrm.fetch(event.acknowledgementsMap(), event.controlRecordAcknowledgements()));
486486
}
487487

488488
/**

clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,23 @@ public class ShareFetchEvent extends ApplicationEvent {
2525

2626
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;
2727

28-
public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
28+
private final Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements;
29+
30+
public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
31+
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements) {
2932
super(Type.SHARE_FETCH);
3033
this.acknowledgementsMap = acknowledgementsMap;
34+
this.controlRecordAcknowledgements = controlRecordAcknowledgements;
3135
}
3236

3337
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
3438
return acknowledgementsMap;
3539
}
3640

41+
public Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements() {
42+
return controlRecordAcknowledgements;
43+
}
44+
3745
@Override
3846
protected String toStringBase() {
3947
return super.toStringBase() + ", acknowledgementsMap=" + acknowledgementsMap;

0 commit comments

Comments
 (0)