Skip to content

In the destination topic chain, the type REUSABLE_RETRY_TOPIC can only be specified as the last retry topic #3834

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
FabioBentoLuiz opened this issue Apr 8, 2025 · 0 comments

Comments

@FabioBentoLuiz
Copy link

In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.4

Describe the bug

Routing of messages to custom DLTs based on thrown exceptions is not working. The listener bean initialization fails with the error:

java.lang.IllegalArgumentException: In the destination topic chain, the type REUSABLE_RETRY_TOPIC can only be specified as the last retry topic.

Stacktrace:

Caused by: java.lang.IllegalArgumentException: In the destination topic chain, the type REUSABLE_RETRY_TOPIC can only be specified as the last retry topic.
at org.springframework.util.Assert.isTrue(Assert.java:116)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.validateDestinations(DefaultDestinationTopicResolver.java:256)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver.addDestinationTopics(DefaultDestinationTopicResolver.java:240)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor.lambda$processRegisteredDestinations$1(DefaultDestinationTopicProcessor.java:64)
at java.base/java.util.HashMap$Values.forEach(HashMap.java:1073)
at org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor.processRegisteredDestinations(DefaultDestinationTopicProcessor.java:64)
at org.springframework.kafka.retrytopic.RetryTopicConfigurer.processMainAndRetryListeners(RetryTopicConfigurer.java:321)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:544)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:517)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMultiMethodListeners(KafkaListenerAnnotationBeanPostProcessor.java:486)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:418)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:439)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1815)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:601)
... 41 more

I see that at DefaultDestinationTopicResolver.java:256 an assertion happens:

private void validateDestinations(List<DestinationTopic> destinationsToAdd) {
	for (int i = 0; i < destinationsToAdd.size(); i++) {
		DestinationTopic destination = destinationsToAdd.get(i);
		if (destination.isReusableRetryTopic()) {
			Assert.isTrue((i == (destinationsToAdd.size() - 1) ||
					((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))),
					String.format("In the destination topic chain, the type %s can only be "
							+ "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC));
		}
	}
}

At this point, my destinationsToAdd (see demo application attached) looks like:

Pos Topic Type
0 test-topic MAIN
1 test-topic.retry REUSABLE_RETRY_TOPIC
2 test-topic.custom.dlt DLT
3 test-topic.dlt DLT

As the topic with type REUSABLE_RETRY_TOPIC is not the last or last but one followed by a DLT, the assertion fails.

To Reproduce

Configure a RetryableTopic and set the exceptionBasedDltRouting parameter:

@KafkaListener(topics = "test-topic", id = "test-listener", idIsGroup = false, groupId = "test-group")
@RetryableTopic(
        backoff = @Backoff(delay = 100),
        attempts = "10",
        autoCreateTopics = "false",
        autoStartDltHandler = "true",
        dltTopicSuffix = ".dlt",
        retryTopicSuffix = ".retry",
        exceptionBasedDltRouting = {
                @ExceptionBasedDltDestination(
                        suffix = ".custom",
                        exceptions = {MyCustomException.class}
                )}
)

Expected behavior

The listener bean is created without error and if the process fails multiple times due MyCustomException, the DLT containing the ".custom.dlt" suffix will be considered as the tarrget topic for the message before the general pupose DLT is considered.

Sample

demo1.zip

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant