Skip to content

Commit 90f8aa0

Browse files
garyrussellartembilan
authored andcommitted
AMQP-833: DLQ and Expiry - Discard
JIRA: https://jira.spring.io/browse/AMQP-833 A common pattern is to add time-to-live to a DLQ and route expired messages back to the main queue. If a message fails with a fatal exception, such messages will cycle around for ever; the application has no mechanism to look at the `x-death` header. Add logic to the `ConditionalRejectingErrorHandler` to look for an `x-death` header and discard the message completely by throwing an `ImmediateAcknowledgeAmqpException`. Add a property to the error handler to revert to the previous behavior. * Polishing Docs according PR comments
1 parent 91f79aa commit 90f8aa0

File tree

7 files changed

+188
-13
lines changed

7 files changed

+188
-13
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.lang.reflect.Type;
2222
import java.util.Date;
2323
import java.util.HashMap;
24+
import java.util.List;
2425
import java.util.Map;
2526

2627
/**
@@ -516,6 +517,20 @@ public void setTargetBean(Object targetBean) {
516517
this.targetBean = targetBean;
517518
}
518519

520+
/**
521+
* Return the x-death header.
522+
* @return the header.
523+
*/
524+
@SuppressWarnings("unchecked")
525+
public List<Map<String, ?>> getXDeathHeader() {
526+
try {
527+
return (List<Map<String, ?>>) this.headers.get("x-death");
528+
}
529+
catch (Exception e) {
530+
return null;
531+
}
532+
}
533+
519534
@Override
520535
public int hashCode() {
521536
final int prime = 31;

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ConditionalRejectingErrorHandler.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@
1616

1717
package org.springframework.amqp.rabbit.listener;
1818

19+
import java.util.List;
20+
import java.util.Map;
21+
1922
import org.apache.commons.logging.Log;
2023
import org.apache.commons.logging.LogFactory;
2124

2225
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
26+
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
27+
import org.springframework.amqp.core.Message;
2328
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
2429
import org.springframework.amqp.support.converter.MessageConversionException;
2530
import org.springframework.messaging.MessagingException;
@@ -54,6 +59,8 @@ public class ConditionalRejectingErrorHandler implements ErrorHandler {
5459

5560
private final FatalExceptionStrategy exceptionStrategy;
5661

62+
private boolean discardFatalsWithXDeath = true;
63+
5764
/**
5865
* Create a handler with the {@link ConditionalRejectingErrorHandler.DefaultExceptionStrategy}.
5966
*/
@@ -69,10 +76,33 @@ public ConditionalRejectingErrorHandler(FatalExceptionStrategy exceptionStrategy
6976
this.exceptionStrategy = exceptionStrategy;
7077
}
7178

79+
/**
80+
* Set to false to disable the (now) default behavior of logging and discarding
81+
* messages that cause fatal exceptions and have an `x-death` header; which
82+
* usually means that the message has been republished after previously being
83+
* sent to a DLQ.
84+
* @param discardFatalsWithXDeath false to disable.
85+
* @since 2.1
86+
*/
87+
public void setDiscardFatalsWithXDeath(boolean discardFatalsWithXDeath) {
88+
this.discardFatalsWithXDeath = discardFatalsWithXDeath;
89+
}
90+
7291
@Override
7392
public void handleError(Throwable t) {
7493
log(t);
7594
if (!this.causeChainContainsARADRE(t) && this.exceptionStrategy.isFatal(t)) {
95+
if (this.discardFatalsWithXDeath && t instanceof ListenerExecutionFailedException) {
96+
Message failed = ((ListenerExecutionFailedException) t).getFailedMessage();
97+
if (failed != null) {
98+
List<Map<String, ?>> xDeath = failed.getMessageProperties().getXDeathHeader();
99+
if (xDeath != null && xDeath.size() > 0) {
100+
this.logger.error("x-death header detected on a message with a fatal exception; "
101+
+ "perhaps requeued from a DLQ? - discarding: " + failed);
102+
throw new ImmediateAcknowledgeAmqpException("Fatal and x-death present");
103+
}
104+
}
105+
}
76106
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
77107
}
78108
}
@@ -121,16 +151,16 @@ public boolean isFatal(Throwable t) {
121151
Throwable cause = t.getCause();
122152
while (cause instanceof MessagingException
123153
&& !(cause instanceof
124-
org.springframework.messaging.converter.MessageConversionException)
154+
org.springframework.messaging.converter.MessageConversionException)
125155
&& !(cause instanceof MethodArgumentResolutionException)) {
126156
cause = cause.getCause();
127157
}
128158
if (t instanceof ListenerExecutionFailedException && isCauseFatal(cause)) {
129159
if (this.logger.isWarnEnabled()) {
130160
this.logger.warn(
131161
"Fatal message conversion error; message rejected; "
132-
+ "it will be dropped or routed to a dead letter exchange, if so configured: "
133-
+ ((ListenerExecutionFailedException) t).getFailedMessage());
162+
+ "it will be dropped or routed to a dead letter exchange, if so configured: "
163+
+ ((ListenerExecutionFailedException) t).getFailedMessage());
134164
}
135165
return true;
136166
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.amqp.core.Message;
27+
import org.springframework.amqp.core.Queue;
28+
import org.springframework.amqp.core.QueueBuilder;
29+
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
30+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
31+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
32+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
33+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
34+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
35+
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
36+
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
37+
import org.springframework.amqp.support.converter.MessageConversionException;
38+
import org.springframework.beans.factory.annotation.Autowired;
39+
import org.springframework.context.annotation.Bean;
40+
import org.springframework.context.annotation.Configuration;
41+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
42+
43+
/**
44+
* @author Gary Russell
45+
* @since 2.1
46+
*
47+
*/
48+
@RabbitAvailable
49+
@SpringJUnitConfig
50+
public class DlqExpiryTests {
51+
52+
@Autowired
53+
private Config config;
54+
55+
@Test
56+
public void testExpiredDies() throws Exception {
57+
this.config.template().convertAndSend("test.expiry.main", "foo");
58+
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue();
59+
Thread.sleep(300);
60+
assertThat(this.config.counter).isEqualTo(2);
61+
this.config.admin().deleteQueue("test.expiry.dlq");
62+
}
63+
64+
@Configuration
65+
@EnableRabbit
66+
public static class Config {
67+
68+
private int counter;
69+
70+
private final CountDownLatch latch = new CountDownLatch(2);
71+
72+
@Bean
73+
public CachingConnectionFactory ccf() {
74+
return new CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
75+
}
76+
77+
@Bean
78+
public RabbitAdmin admin() {
79+
return new RabbitAdmin(ccf());
80+
}
81+
82+
@Bean
83+
public RabbitTemplate template() {
84+
return new RabbitTemplate(ccf());
85+
}
86+
87+
@Bean
88+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
89+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
90+
factory.setConnectionFactory(ccf());
91+
return factory;
92+
}
93+
94+
@Bean
95+
public Queue main() {
96+
return QueueBuilder.nonDurable("test.expiry.main").autoDelete()
97+
.withArgument("x-dead-letter-exchange", "")
98+
.withArgument("x-dead-letter-routing-key", "test.expiry.dlq")
99+
.build();
100+
}
101+
102+
@Bean
103+
public Queue dlq() {
104+
return QueueBuilder.nonDurable("test.expiry.dlq").autoDelete()
105+
.withArgument("x-dead-letter-exchange", "")
106+
.withArgument("x-dead-letter-routing-key", "test.expiry.main")
107+
.withArgument("x-message-ttl", 100)
108+
.build();
109+
}
110+
111+
@RabbitListener(queues = "test.expiry.main")
112+
public void listen(Message in) {
113+
this.latch.countDown();
114+
this.counter++;
115+
throw new MessageConversionException("test.expiry");
116+
}
117+
118+
}
119+
120+
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ErrorHandlerTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -54,11 +54,11 @@ public void testFatalsAreRejected() throws Exception {
5454
willDoNothing().given(logger).warn(anyString(), any(Throwable.class));
5555
new DirectFieldAccessor(handler).setPropertyValue("logger", logger);
5656
handler.handleError(new ListenerExecutionFailedException("intended", new RuntimeException(),
57-
mock(org.springframework.amqp.core.Message.class)));
57+
new org.springframework.amqp.core.Message("".getBytes(), new MessageProperties())));
5858

5959
try {
6060
handler.handleError(new ListenerExecutionFailedException("intended", new MessageConversionException(""),
61-
mock(org.springframework.amqp.core.Message.class)));
61+
new org.springframework.amqp.core.Message("".getBytes(), new MessageProperties())));
6262
fail("Expected exception");
6363
}
6464
catch (AmqpRejectAndDontRequeueException e) {
@@ -67,7 +67,7 @@ public void testFatalsAreRejected() throws Exception {
6767
try {
6868
handler.handleError(new ListenerExecutionFailedException("intended",
6969
new org.springframework.messaging.converter.MessageConversionException(""),
70-
mock(org.springframework.amqp.core.Message.class)));
70+
new org.springframework.amqp.core.Message("".getBytes(), new MessageProperties())));
7171
fail("Expected exception");
7272
}
7373
catch (AmqpRejectAndDontRequeueException e) {
@@ -78,7 +78,7 @@ public void testFatalsAreRejected() throws Exception {
7878
try {
7979
handler.handleError(new ListenerExecutionFailedException("intended",
8080
new MethodArgumentNotValidException(message, mp),
81-
mock(org.springframework.amqp.core.Message.class)));
81+
new org.springframework.amqp.core.Message("".getBytes(), new MessageProperties())));
8282
fail("Expected exception");
8383
}
8484
catch (AmqpRejectAndDontRequeueException e) {
@@ -87,7 +87,7 @@ public void testFatalsAreRejected() throws Exception {
8787
try {
8888
handler.handleError(new ListenerExecutionFailedException("intended",
8989
new MethodArgumentTypeMismatchException(message, mp, ""),
90-
mock(org.springframework.amqp.core.Message.class)));
90+
new org.springframework.amqp.core.Message("".getBytes(), new MessageProperties())));
9191
fail("Expected exception");
9292
}
9393
catch (AmqpRejectAndDontRequeueException e) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.springframework.amqp.core.Message;
5252
import org.springframework.amqp.core.MessageBuilder;
5353
import org.springframework.amqp.core.MessageListener;
54+
import org.springframework.amqp.core.MessageProperties;
5455
import org.springframework.amqp.core.Queue;
5556
import org.springframework.amqp.core.QueueBuilder;
5657
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
@@ -264,7 +265,7 @@ public void testRejectingErrorHandler() throws Exception {
264265
container.stop();
265266

266267
Exception e = new ListenerExecutionFailedException("foo", new MessageConversionException("bar"),
267-
mock(Message.class));
268+
new Message("".getBytes(), new MessageProperties()));
268269
try {
269270
eh.handleError(e);
270271
fail("expected exception");

src/reference/asciidoc/amqp.adoc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3824,7 +3824,7 @@ public class RabbitServerConfiguration extends AbstractStockAppRabbitConfigurati
38243824
----
38253825

38263826
This is the end of the whole inheritance chain of `@Configuration` classes.
3827-
The end result is the the TopicExchange and Queue will be declared to the broker upon application startup.
3827+
The end result is the `TopicExchange` and `Queue` will be declared to the broker upon application startup.
38283828
There is no binding of the TopicExchange to a queue in the server configuration, as that is done in the client application.
38293829
The stock request queue however is automatically bound to the AMQP default exchange - this behavior is defined by the specification.
38303830

@@ -4278,7 +4278,11 @@ In addition, the `ListenerExecutionFailedException` now has a `failedMessage` pr
42784278
If the `FatalExceptionStrategy.isFatal()` method returns `true`, the error handler throws an `AmqpRejectAndDontRequeueException`.
42794279
The default `FatalExceptionStrategy` logs a warning message when an exception is determined to be fatal.
42804280

4281-
Since _version 1.6.3_ a convenient way to add user exceptions to the fatal list is to subclass `ConditionalRejectingErrorHandler.DefaultExceptionStrategy` and override the method `isUserCauseFatal(Throwable cause)` to return true for fatal exceptions.
4281+
Since version 1.6.3, a convenient way to add user exceptions to the fatal list is to subclass `ConditionalRejectingErrorHandler.DefaultExceptionStrategy` and override the method `isUserCauseFatal(Throwable cause)` to return true for fatal exceptions.
4282+
4283+
A common pattern for handling DLQ messages is to set a `time-to-live` on those messages as well as additional DLQ configuration such that these messages expire and are routed back to the main queue for retry.
4284+
The problem with this technique is that messages that cause fatal exceptions will loop forever.
4285+
Starting with version 2.1, the `ConditionalRejectingErrorHandler` detects an `x-death` header on a message that causes a fatal exception to be thrown, the message will be logged and discarded.
42824286

42834287
[[transactions]]
42844288
==== Transactions

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,9 @@ See <<async-return>> for more information.
7373

7474
===== Connection Factory Bean Changes
7575

76-
The `RabbitConnectionFactoryBean` now calls `enableHostnameVerification()` by default; to revert to the previous behavior, set the `enabaleHostnameVerification` property to `false`.
76+
The `RabbitConnectionFactoryBean` now calls `enableHostnameVerification()` by default; to revert to the previous behavior, set the `enableHostnameVerification` property to `false`.
77+
78+
===== Listener Container Changes
79+
80+
The default `ConditionalRejectingErrorHandler` will now completely discard messages that cause fatal errors if an `x-death` header is present.
81+
See <<exception-handling>> for more information.

0 commit comments

Comments
 (0)