Skip to content

Commit aa3589c

Browse files
garyrussellartembilan
authored andcommitted
GH-1409: Fix Nacks for Async Replies
Resolves #1409 Normally, when message has a fatal exception (such as message conversion) `basicNack` with `multiple` true is used, to nack any previously unacked messages (e.g. when using batch size to limit the ack traffic). Even when using manual acks, fatal exceptions are nacked by the container because the user does not have access to the message. However, when using async replies, this has the side effect of nacking unprocessed messages. Detect whether async replies are being used and only nack individual records that cause fatal exceptions. Also, coerce the `AcknowledgeMode` to `MANUAL` for such listners. Add a test for both containers; send a good message followed by a bad one without actually completing the reply future. After the exception occurs and the container is stopped, there should be one messag in the queue. * Remove warning, deprecation; add docs. * Docs. **Cherry-pick to `2.3.x` & `2.2.x`** # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/DelegatingInvocableHandler.java # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/HandlerAdapter.java
1 parent d11841a commit aa3589c

File tree

13 files changed

+340
-31
lines changed

13 files changed

+340
-31
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/MessageListener.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,6 +43,16 @@ default void containerAckMode(AcknowledgeMode mode) {
4343
// NOSONAR - empty
4444
}
4545

46+
/**
47+
* Return true if this listener is request/reply and the replies are
48+
* async.
49+
* @return true for async replies.
50+
* @since 2.2.21
51+
*/
52+
default boolean isAsyncReplies() {
53+
return false;
54+
}
55+
4656
/**
4757
* Delivers a batch of messages.
4858
* @param messages the messages.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor
250250

251251
private volatile boolean lazyLoad;
252252

253+
private boolean asyncReplies;
254+
253255
@Override
254256
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
255257
this.applicationEventPublisher = applicationEventPublisher;
@@ -433,6 +435,7 @@ public void setMessageListener(MessageListener messageListener) {
433435
this.messageListener = messageListener;
434436
this.isBatchListener = messageListener instanceof BatchMessageListener
435437
|| messageListener instanceof ChannelAwareBatchMessageListener;
438+
this.asyncReplies = messageListener.isAsyncReplies();
436439
}
437440

438441
/**
@@ -1009,10 +1012,12 @@ public boolean isPossibleAuthenticationFailureFatal() {
10091012
return this.possibleAuthenticationFailureFatal;
10101013
}
10111014

1012-
10131015
protected boolean isPossibleAuthenticationFailureFatalSet() {
10141016
return this.possibleAuthenticationFailureFatalSet;
10151017
}
1018+
protected boolean isAsyncReplies() {
1019+
return this.asyncReplies;
1020+
}
10161021

10171022
/**
10181023
* Set to true to automatically declare elements (queues, exchanges, bindings)
@@ -1193,6 +1198,9 @@ public final void afterPropertiesSet() {
11931198
catch (IllegalStateException e) {
11941199
this.logger.debug("Could not enable micrometer timers", e);
11951200
}
1201+
if (this.isAsyncReplies() && !AcknowledgeMode.MANUAL.equals(this.acknowledgeMode)) {
1202+
this.acknowledgeMode = AcknowledgeMode.MANUAL;
1203+
}
11961204
}
11971205

11981206
@Override

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,17 @@ public synchronized void stop() {
758758
* @param ex the thrown application exception or error
759759
*/
760760
public void rollbackOnExceptionIfNecessary(Throwable ex) {
761+
rollbackOnExceptionIfNecessary(ex, -1);
762+
}
763+
764+
/**
765+
* Perform a rollback, handling rollback exceptions properly.
766+
* @param ex the thrown application exception or error
767+
* @param tag delivery tag; when specified (greater than or equal to 0) only that
768+
* message is nacked.
769+
* @since 2.2.21.
770+
*/
771+
public void rollbackOnExceptionIfNecessary(Throwable ex, long tag) {
761772

762773
boolean ackRequired = !this.acknowledgeMode.isAutoAck()
763774
&& (!this.acknowledgeMode.isManual() || ContainerUtils.isRejectManual(ex));
@@ -769,14 +780,20 @@ public void rollbackOnExceptionIfNecessary(Throwable ex) {
769780
RabbitUtils.rollbackIfNecessary(this.channel);
770781
}
771782
if (ackRequired) {
772-
OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
773-
if (deliveryTag.isPresent()) {
774-
this.channel.basicNack(deliveryTag.getAsLong(), true,
775-
ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
783+
if (tag < 0) {
784+
OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
785+
if (deliveryTag.isPresent()) {
786+
this.channel.basicNack(deliveryTag.getAsLong(), true,
787+
ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
788+
}
789+
if (this.transactional) {
790+
// Need to commit the reject (=nack)
791+
RabbitUtils.commitIfNecessary(this.channel);
792+
}
776793
}
777-
if (this.transactional) {
778-
// Need to commit the reject (=nack)
779-
RabbitUtils.commitIfNecessary(this.channel);
794+
else {
795+
this.channel.basicNack(tag, false,
796+
ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
780797
}
781798
}
782799
}
@@ -785,7 +802,12 @@ public void rollbackOnExceptionIfNecessary(Throwable ex) {
785802
throw RabbitExceptionTranslator.convertRabbitAccessException(e); // NOSONAR stack trace loss
786803
}
787804
finally {
788-
this.deliveryTags.clear();
805+
if (tag < 0) {
806+
this.deliveryTags.clear();
807+
}
808+
else {
809+
this.deliveryTags.remove(tag);
810+
}
789811
}
790812
}
791813

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1196,7 +1196,7 @@ private void rollback(long deliveryTag, Exception e) {
11961196
}
11971197
}
11981198
}
1199-
getChannel().basicNack(deliveryTag, true,
1199+
getChannel().basicNack(deliveryTag, !isAsyncReplies(),
12001200
ContainerUtils.shouldRequeue(isDefaultRequeueRejected(), e, this.logger));
12011201
}
12021202
catch (IOException e1) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,9 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
991991
}
992992
break;
993993
}
994+
long tagToRollback = isAsyncReplies()
995+
? message.getMessageProperties().getDeliveryTag()
996+
: -1;
994997
if (getTransactionManager() != null) {
995998
if (getTransactionAttribute().rollbackOn(ex)) {
996999
RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
@@ -1003,7 +1006,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
10031006
* If we don't actually have a transaction, we have to roll back
10041007
* manually. See prepareHolderForRollback().
10051008
*/
1006-
consumer.rollbackOnExceptionIfNecessary(ex);
1009+
consumer.rollbackOnExceptionIfNecessary(ex, tagToRollback);
10071010
}
10081011
throw ex; // encompassing transaction will handle the rollback.
10091012
}
@@ -1015,7 +1018,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
10151018
}
10161019
}
10171020
else {
1018-
consumer.rollbackOnExceptionIfNecessary(ex);
1021+
consumer.rollbackOnExceptionIfNecessary(ex, tagToRollback);
10191022
throw ex;
10201023
}
10211024
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.lang.reflect.Type;
2222
import java.lang.reflect.WildcardType;
2323
import java.util.Arrays;
24-
import java.util.function.Consumer;
2524

2625
import org.apache.commons.logging.Log;
2726
import org.apache.commons.logging.LogFactory;
@@ -56,7 +55,6 @@
5655
import org.springframework.util.concurrent.ListenableFuture;
5756

5857
import com.rabbitmq.client.Channel;
59-
import reactor.core.publisher.Mono;
6058

6159
/**
6260
* An abstract {@link org.springframework.amqp.core.MessageListener} adapter providing the
@@ -81,7 +79,7 @@ public abstract class AbstractAdaptableMessageListener implements ChannelAwareMe
8179

8280
private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
8381

84-
private static final boolean monoPresent = // NOSONAR - lower case
82+
static final boolean monoPresent = // NOSONAR - lower case, protected
8583
ClassUtils.isPresent("reactor.core.publisher.Mono", ChannelAwareMessageListener.class.getClassLoader());
8684

8785
/**
@@ -635,19 +633,4 @@ public Object getResult() {
635633

636634
}
637635

638-
private static class MonoHandler { // NOSONAR - pointless to name it ..Utils|Helper
639-
640-
static boolean isMono(Object result) {
641-
return result instanceof Mono;
642-
}
643-
644-
@SuppressWarnings("unchecked")
645-
static void subscribe(Object returnValue, Consumer<? super Object> success,
646-
Consumer<? super Throwable> failure, Runnable completeConsumer) {
647-
648-
((Mono<? super Object>) returnValue).subscribe(success, failure, completeConsumer);
649-
}
650-
651-
}
652-
653636
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.lang.reflect.Method;
2121
import java.util.ArrayList;
2222
import java.util.Arrays;
23+
import java.util.Iterator;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +42,7 @@
4142
import org.springframework.messaging.handler.annotation.SendTo;
4243
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
4344
import org.springframework.util.Assert;
45+
import org.springframework.util.concurrent.ListenableFuture;
4446

4547

4648
/**
@@ -75,6 +77,8 @@ public class DelegatingInvocableHandler {
7577

7678
private final BeanExpressionContext beanExpressionContext;
7779

80+
private final boolean asyncReplies;
81+
7882
/**
7983
* Construct an instance with the supplied handlers for the bean.
8084
* @param handlers the handlers.
@@ -106,6 +110,18 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
106110
this.bean = bean;
107111
this.resolver = beanExpressionResolver;
108112
this.beanExpressionContext = beanExpressionContext;
113+
boolean asyncReplies;
114+
asyncReplies = defaultHandler != null && isAsyncReply(defaultHandler);
115+
Iterator<InvocableHandlerMethod> iterator = handlers.iterator();
116+
while (iterator.hasNext()) {
117+
asyncReplies |= isAsyncReply(iterator.next());
118+
}
119+
this.asyncReplies = asyncReplies;
120+
}
121+
122+
private boolean isAsyncReply(InvocableHandlerMethod method) {
123+
return (AbstractAdaptableMessageListener.monoPresent && MonoHandler.isMono(method.getMethod().getReturnType()))
124+
|| ListenableFuture.class.isAssignableFrom(method.getMethod().getReturnType());
109125
}
110126

111127
/**
@@ -115,6 +131,15 @@ public Object getBean() {
115131
return this.bean;
116132
}
117133

134+
/**
135+
* Return true if any handler method has an async reply type.
136+
* @return the asyncReply.
137+
* @since 2.2.21
138+
*/
139+
public boolean isAsyncReplies() {
140+
return this.asyncReplies;
141+
}
142+
118143
/**
119144
* Invoke the method with the given message.
120145
* @param message the message.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/HandlerAdapter.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.lang.Nullable;
2323
import org.springframework.messaging.Message;
2424
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
25+
import org.springframework.util.concurrent.ListenableFuture;
2526

2627
/**
2728
* A wrapper for either an {@link InvocableHandlerMethod} or
@@ -38,14 +39,24 @@ public class HandlerAdapter {
3839

3940
private final DelegatingInvocableHandler delegatingHandler;
4041

42+
private final boolean asyncReplies;
43+
44+
/**
45+
* Construct an instance with the provided method.
46+
* @param invokerHandlerMethod the method.
47+
*/
4148
public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
4249
this.invokerHandlerMethod = invokerHandlerMethod;
4350
this.delegatingHandler = null;
51+
this.asyncReplies = (AbstractAdaptableMessageListener.monoPresent
52+
&& MonoHandler.isMono(invokerHandlerMethod.getMethod().getReturnType()))
53+
|| ListenableFuture.class.isAssignableFrom(invokerHandlerMethod.getMethod().getReturnType());
4454
}
4555

4656
public HandlerAdapter(DelegatingInvocableHandler delegatingHandler) {
4757
this.invokerHandlerMethod = null;
4858
this.delegatingHandler = delegatingHandler;
59+
this.asyncReplies = delegatingHandler.isAsyncReplies();
4960
}
5061

5162
public InvocationResult invoke(Message<?> message, Object... providedArgs) throws Exception { // NOSONAR
@@ -126,6 +137,22 @@ public Object getBean() {
126137
}
127138
}
128139

140+
/**
141+
* Return true if any handler method has an async reply type.
142+
* @return the asyncReply.
143+
* @since 2.2.21
144+
*/
145+
public boolean isAsyncReplies() {
146+
return this.asyncReplies;
147+
}
148+
149+
/**
150+
* Build an {@link InvocationResult} for the result and inbound payload.
151+
* @param result the result.
152+
* @param inboundPayload the payload.
153+
* @return the invocation result.
154+
* @since 2.1.7
155+
*/
129156
@Nullable
130157
public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) {
131158
if (this.invokerHandlerMethod != null) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ public void setHandlerAdapter(HandlerAdapter handlerAdapter) {
103103
this.handlerAdapter = handlerAdapter;
104104
}
105105

106+
@Override
107+
public boolean isAsyncReplies() {
108+
return this.handlerAdapter.isAsyncReplies();
109+
}
110+
106111
/**
107112
* Set the {@link AmqpHeaderMapper} implementation to use to map the standard
108113
* AMQP headers. By default, a {@link org.springframework.amqp.support.SimpleAmqpHeaderMapper
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.listener.adapter;
18+
19+
import java.util.function.Consumer;
20+
21+
import reactor.core.publisher.Mono;
22+
23+
/**
24+
* Class to prevent direct links to {@link Mono}.
25+
* @author Gary Russell
26+
* @since 2.2.21
27+
*/
28+
final class MonoHandler { // NOSONAR - pointless to name it ..Utils|Helper
29+
30+
private MonoHandler() {
31+
}
32+
33+
static boolean isMono(Object result) {
34+
return result instanceof Mono;
35+
}
36+
37+
@SuppressWarnings("unchecked")
38+
static void subscribe(Object returnValue, Consumer<? super Object> success,
39+
Consumer<? super Throwable> failure, Runnable completeConsumer) {
40+
41+
((Mono<? super Object>) returnValue).subscribe(success, failure, completeConsumer);
42+
}
43+
44+
}

0 commit comments

Comments
 (0)