Skip to content

Trace context is being reset in between retry attempts and dlt publication #3816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
msatmarean18 opened this issue Mar 21, 2025 · 1 comment

Comments

@msatmarean18
Copy link

In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.0

Describe the bug

I'm using RetryTopicConfiguration to configure the non blocking retries for my consumer.
After upgrading to Spring Boot 3.4.0 I noticed that with each retry attempt a new traceId is created. Also, the traceparent header on the Kafka message is accumulating a new id with each retry. I.e if it retries for 3 times, the message will have 3 traceparent headers.
This was not the case in previous versions of Spring for Apache Kafka.

Looking at the logs I think the trace context is being reset after the exception is thrown from @KafkaListenerannotated method.

3.2.8

[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] c.m.e.k.Kafka.Consumer    : Consuming message
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] k.r.DeadLetterPublishingRecovererFactory : Resolved topic: completed.retry
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-4e11f19167499c04] k.r.DeadLetterPublishingRecovererFactory : Record: topic = completed, partition = 0, offset = 58, main topic = completed threw an error at topic completed. Sending to retry topic completed.retry.

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2874)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2815)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2779)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2702)
.
.
.
[ntainer#0-0-C-1] [e05006e2c016aab51e0ea8fd9f856c3a-ac9768534aa47b94] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:

3.3.0

[ntainer#0-0-C-1] [9fc4895742222ff70aec850c2cb4311d-f30073bf3bd3c48c] c.m.e.k.Kafka.Consumer    : Consuming message
[ntainer#0-0-C-1] [                                                 ] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
[ntainer#0-0-C-1] [                                                 ] k.r.DeadLetterPublishingRecovererFactory : Resolved topic: completed.retry
[ntainer#0-0-C-1] [                                                 ] k.r.DeadLetterPublishingRecovererFactory : Record: topic = completed, partition = 0, offset = 47, main topic = completed threw an error at topic completed. Sending to retry topic completed.retry.

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2986)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2889)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2853)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2766)
.
.
.

[ntainer#0-0-C-1] [44c2097849a6467cfeaadddb4b32b9ae-b0ac20860d36c3ba] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 

To Reproduce

Create a RetryTopicConfiguration bean:

 @Bean
    public RetryTopicConfiguration kafkaRetryConfiguration(
            KafkaTemplate<Object, Object> kafkaTemplate) {

        return RetryTopicConfigurationBuilder
                .newInstance()
                .fixedBackOff(consumerRetryProperties.getBackoff())
                .useSingleTopicForSameIntervals()
                .maxAttempts(consumerRetryProperties.getAttempts())
                .includeTopics(consumerRetryProperties.getTopics())
                .retryTopicSuffix(consumerRetryProperties.getRetryTopicSuffix())
                .dltSuffix(consumerRetryProperties.getErrorTopicSuffix())
                .doNotRetryOnDltFailure()
                .retryOn(consumerRetryProperties.getRetryOn())
                .dltHandlerMethod(consumerRetryProperties.getDltBeanName(), consumerRetryProperties.getDltMethod())
                .doNotAutoCreateRetryTopics()
                .create(kafkaTemplate);
    }

Create a listener method.

    @KafkaListener(
            topics = {"${my.kafka.topics.completed}"},
            clientIdPrefix = "${my.kafka.client-id-prefix}"
    )

    public void listen(@Payload ConsumerRecord<String, PayloadModel> consumerRecord) {
// consumer logic
}
   

Throw an "Retryable" exception from the listen method.

Expected behavior

TraceId should be propagated with each retry attempt or dlt publish.

Sample

N/A

@artembilan
Copy link
Member

Any chances that you can share with us a simple project to reproduce on our side?
Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants