Skip to content

Commit 2d02f1d

Browse files
authored
KAFKA-19084: Port KAFKA-16224, KAFKA-16764 for ShareConsumers (#19369)
Currently for ShareConsumers, if we receive an `UNKNOWN_TOPIC_OR_PARTITION` error code in the `ShareAcknowledgeResponse`, then we retry sending the acknowledgements until the timer expires. We ideally do not want this when a topic/partition is deleted, hence like the `CommitRequestManager`(#15581), we will treat this error as fatal and not retry the acknowledgements. PR also suppresses `InvalidTopicException` during `unsubscribe()` which was also added in the `AsyncKafkaConsumer`(#16043). It was later removed in the regular consumer as they notified the background operations of metadata errors instead of propagating them via `ErrorEvent`. `ShareConsumerImpl` however does not require that change and it still propagates the metadata error back to the application. So we would need to suppress this exception during unsubscribe(). Reviewers: Andrew Schofield <[email protected]>, Sushant Mahajan <[email protected]>
1 parent 6dd2cc7 commit 2d02f1d

File tree

4 files changed

+41
-2
lines changed

4 files changed

+41
-2
lines changed

Diff for: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -971,9 +971,11 @@ private void handlePartitionError(ShareAcknowledgeResponseData.PartitionData par
971971
AtomicBoolean shouldRetry) {
972972
if (partitionError.exception() != null) {
973973
boolean retry = false;
974-
if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH) {
974+
if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH || partitionError == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
975975
// If the leader has changed, there's no point in retrying the operation because the acquisition locks
976976
// will have been released.
977+
// If the topic or partition has been deleted, we do not retry the failed acknowledgements.
978+
// Instead, these records will be re-delivered once they get timed out on the broker.
977979
updateLeaderInfoMap(partitionData, partitionsWithUpdatedLeaderInfo, partitionError, tip.topicPartition());
978980
} else if (partitionError.exception() instanceof RetriableException) {
979981
retry = true;

Diff for: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.kafka.common.errors.GroupAuthorizationException;
5858
import org.apache.kafka.common.errors.InterruptException;
5959
import org.apache.kafka.common.errors.InvalidGroupIdException;
60+
import org.apache.kafka.common.errors.InvalidTopicException;
6061
import org.apache.kafka.common.errors.TimeoutException;
6162
import org.apache.kafka.common.errors.TopicAuthorizationException;
6263
import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -937,7 +938,7 @@ private void sendAcknowledgementsAndLeaveGroup(final Timer timer, final AtomicRe
937938
// If users have fatal error, they will get some exceptions in the background queue.
938939
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
939940
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> (e instanceof GroupAuthorizationException
940-
|| e instanceof TopicAuthorizationException));
941+
|| e instanceof TopicAuthorizationException || e instanceof InvalidTopicException));
941942
log.info("Completed releasing assignment and leaving group to close consumer.");
942943
} catch (TimeoutException e) {
943944
log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " +

Diff for: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java

+25
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,31 @@ public void testRetryAcknowledgements() throws InterruptedException {
656656
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
657657
}
658658

659+
@ParameterizedTest
660+
@EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER", "UNKNOWN_TOPIC_OR_PARTITION"})
661+
public void testFatalErrorsAcknowledgementResponse(Errors error) {
662+
buildRequestManager();
663+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
664+
665+
assignFromSubscribed(Collections.singleton(tp0));
666+
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
667+
668+
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
669+
670+
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
671+
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
672+
673+
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
674+
client.prepareResponse(fullAcknowledgeResponse(tip0, error));
675+
networkClientDelegate.poll(time.timer(0));
676+
677+
// Assert these errors are not retried even if they are retriable. They are treated as fatal and a metadata update is triggered.
678+
assertEquals(0, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getInFlightAcknowledgementsCount(tip0));
679+
assertEquals(0, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getIncompleteAcknowledgementsCount(tip0));
680+
assertEquals(1, completedAcknowledgements.size());
681+
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
682+
}
683+
659684
@Test
660685
public void testRetryAcknowledgementsMultipleCommitAsync() {
661686
buildRequestManager();

Diff for: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java

+11
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.kafka.common.TopicPartition;
3535
import org.apache.kafka.common.Uuid;
3636
import org.apache.kafka.common.errors.InvalidGroupIdException;
37+
import org.apache.kafka.common.errors.InvalidTopicException;
3738
import org.apache.kafka.common.errors.TimeoutException;
3839
import org.apache.kafka.common.errors.TopicAuthorizationException;
3940
import org.apache.kafka.common.errors.WakeupException;
@@ -331,6 +332,16 @@ public void testUnsubscribeWithTopicAuthorizationException() {
331332
assertDoesNotThrow(() -> consumer.close());
332333
}
333334

335+
@Test
336+
public void testCloseWithInvalidTopicException() {
337+
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
338+
consumer = newConsumer(subscriptions);
339+
340+
backgroundEventQueue.add(new ErrorEvent(new InvalidTopicException(Set.of("!test-topic"))));
341+
completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
342+
assertDoesNotThrow(() -> consumer.close());
343+
}
344+
334345
@Test
335346
public void testCloseWithTopicAuthorizationException() {
336347
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);

0 commit comments

Comments
 (0)