Skip to content

MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. #19295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

ShivsundarR
Copy link
Collaborator

@ShivsundarR ShivsundarR commented Mar 26, 2025

What
Currently if we received just a control record in the ShareFetchResponse, then the currentFetch in ShareConsumerImpl would not be updated as the record is ignored. But in the process, we lose the acknowledgment for this control record which is a GAP.
PR fixes this by adding an additional map for control record acknowledgements in ShareFetchEvent.
This updates both the ShareConsumerImpl and ShareConsumeRequestManager to accommodate the additional map.
Added a unit test in ShareConsumerImplTest and ShareConsumeRequestManagerTest to verify the changes.

PR also refactors ShareConsumeRequestManagerTest as the Java NCSS check was getting exceeded.

@github-actions github-actions bot added triage PRs from the community consumer clients small Small PRs labels Mar 26, 2025
@AndrewJSchofield AndrewJSchofield added KIP-932 Queues for Kafka ci-approved and removed triage PRs from the community labels Mar 26, 2025
// Fetch more records and send any waiting acknowledgements
applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap));
applicationEventHandler.add(new ShareFetchEvent(combinedAcknowledgements));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse me, is it possible that fetch.takeAcknowledgedRecords() returns non empty records when fetch.isEmpty is true?

Copy link
Collaborator Author

@ShivsundarR ShivsundarR Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of :)) So it would have empty records but could have non-empty acknowledgements (for skipped records).

  • Some integ tests in this PR - KAFKA-16729: Support isolation level for share consumer #19261 revealed that in transactions, when client receives only a control record(eg. an abort marker) in the ShareFetchResponse (without any non-control record), then in the ShareCompletedFetch, these control records are never acknowledged(ideally acknowledged with GAP, indicating the client is ignoring these control records) and are never presented to the consumer application.

  • It is expected that control records are skipped and are not presented to the application, so the records never arrive to the application thread, but client should still acknowledge them with GAP (

    public static final byte ACKNOWLEDGE_TYPE_GAP = (byte) 0;
    )

  • Now these control records are usually auto acknowledged with GAP and will be sent on the next ShareFetch/ShareAcknowledge request. But here as fetch.isEmpty() only checks for numRecords() == 0, when the fetch is empty, we actually ignore the fetch here(meaning we never acknowledge these control records) - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598

  • Now after this PR, any possible acknowledgements that came in with the empty fetch (from control records) to the ShareFetchEvent are added so that it can be sent on the next poll().

  • We cannot present these to the application, so the check for fetch.isEmpty cannot be altered. But yeah there is a case when this could happen.

  • I agree it looks a bit odd though for readability. I will think of a different way of propagating the acks to the background thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ShivsundarR thanks for explanation, and I have a follow-up question.

But here as fetch.isEmpty() only checks for numRecords() == 0, when the fetch is empty, we actually ignore the fetch here(meaning we never acknowledge these control records) -

fetch.isEmpty() means all batches have no inFlightRecords and acknowledgements, right?

    public boolean isEmpty() {
        return inFlightRecords.isEmpty() && acknowledgements.isEmpty();
    }

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java#L116

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clearly not quite right yet. I want to see the log dump for the situation which causes this and understand the batching.

Copy link
Collaborator Author

@ShivsundarR ShivsundarR Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 So we have ShareFetch and ShareInFlightBatch.
ShareFetch has a Map<TopicIdPartition, ShareInFlightBatch<K, V>> as a member.
ShareFetch::isEmpty does

public boolean isEmpty() {
   return numRecords() == 0;
}

which iterates through the batches and checks ShareInFlightBatch::numRecords which is

int numRecords() {
     return inFlightRecords.size();
}

The code you shared is from ShareInFlightBatch::isEmpty() where it checks both the InFlightRecords and Acknowledgements.

public boolean isEmpty() {
     return inFlightRecords.isEmpty() && acknowledgements.isEmpty();
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which iterates through the batches and checks ShareInFlightBatch::numRecords which is

I see. thanks for this clarification.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. It's a strange situation but it is a real bug, and made much more likely by the support for read_committed isolation level being added.

I would fix it differently. In ShareConsumerImpl.pollForFetched(Timer), there is a call to collect more data on line 621. That returns a ShareFetch which in this case has no records (it's empty) but it does contain acknowledgements to send. They are being missed.

Then on line 640, there is a call to collect(Collections.emptyMap()). That should not be an empty map in this case. We should be taking the acknowledged records from the supposedly empty batch.

@@ -644,8 +645,12 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> ack
if (currentFetch.isEmpty()) {
final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
if (fetch.isEmpty()) {
// Check for any acknowledgements which could have come from control records (GAP) and include them.
Map<TopicIdPartition, NodeAcknowledgements> combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for two ShareFetchResponses to contain only control records for the same partition? If so, we should merge the AcknowledgedRecords for that specific partition

@ShivsundarR
Copy link
Collaborator Author

ShivsundarR commented Mar 27, 2025

@AndrewJSchofield as @chia7712 mentioned if the case arises where successive ShareFetchResponses have only control records (eg. 2 abort markers received in read_commited mode),
and if the second response came in at fetchBuffer.awaitNotEmpty(pollTimer);, then when we process line 641 by sending the initial acknowledgements for control records, then we would get a new set of acknowledgements for control records, but this time, these incoming acknowledgements would be ignored and missed.
Here we might have to combine the acknowledgements.

@github-actions github-actions bot removed the small Small PRs label Mar 28, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, and the refactoring in the test. Apart from some minor points in the tests, I'm happy with this PR. Almost ready to merge.

@@ -1234,26 +1087,20 @@ public void testRetryAcknowledgementsWithLeaderChange() {
subscriptions.assignFromSubscribed(partitions);

client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 1),
tp -> validLeaderEpoch, topicIds, false));
RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In quite a few places in this file, the indentation has been increased but the lines have not been changed apart from that. Please can you minimise these changes so the PR is much tighter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yeah it was some IDE setting which changed the indentation automatically, I missed it.
I have changed this back now.

assertNull(fetchAcksToSend.get(tip0).get(3L));
}

void sendFetchAndVerifyResponse(MemoryRecords records,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be private I think.


void sendFetchAndVerifyResponse(MemoryRecords records,
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords,
Errors... error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are not using the varargs part of this. You could just use Errors error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was one test which needed this, the change was missed when I pushed the commit. I have updated the PR with one usage of this varargs part. In future if we add tests with both top level error and acknowledge error, this might be useful as well.

@AndrewJSchofield
Copy link
Member

Build failure due to excessively flaky test. Test disabled by #19340.

@AndrewJSchofield AndrewJSchofield merged commit e301508 into apache:trunk Apr 1, 2025
25 of 26 checks passed
janchilling pushed a commit to janchilling/kafka that referenced this pull request Apr 4, 2025
…trol records when ShareFetch is empty. (apache#19295)

Currently if we received just a control record in the
`ShareFetchResponse`, then the currentFetch in `ShareConsumerImpl` would
not be updated as the record is ignored. But in the process, we lose the
acknowledgment for this control record which is a GAP.
PR fixes this by adding an additional map for control record
acknowledgements in `ShareFetchEvent`.
This updates both the ShareConsumerImpl and ShareConsumeRequestManager
to accommodate the additional map.
Added a unit test in `ShareConsumerImplTest` and
`ShareConsumeRequestManagerTest` to verify the changes.

Reviewers: Andrew Schofield <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants