Skip to content

Commit 88d98df

Browse files
garyrussellartembilan
authored andcommitted
GH-1230: Fix return/confirm delivery order
Resolves #1230 We must guarantee that returns are delivered before confirms. #1055 changed the logic to deliver on a different thread to avoid a deadlock in the amqp-client. Since the executor service might have multiple threads, we added a latch to ensure the return is delivered first. Unfortunately, this was a one-shot latch and subsequent deliveries using the same channel might arrive in the wrong order. Move the latch to the `PendingConfirm` to reinstate in-order delivery. **cherry-pick to 2.2.x**
1 parent 78d2eac commit 88d98df

File tree

3 files changed

+148
-15
lines changed

3 files changed

+148
-15
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PendingConfirm.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,9 @@
1616

1717
package org.springframework.amqp.rabbit.connection;
1818

19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
21+
1922
import org.springframework.lang.Nullable;
2023

2124
/**
@@ -29,13 +32,19 @@
2932
*/
3033
public class PendingConfirm {
3134

35+
static final long RETURN_CALLBACK_TIMEOUT = 60;
36+
3237
@Nullable
3338
private final CorrelationData correlationData;
3439

3540
private final long timestamp;
3641

42+
private final CountDownLatch latch = new CountDownLatch(1);
43+
3744
private String cause;
3845

46+
private boolean returned;
47+
3948
/**
4049
* @param correlationData The correlation data.
4150
* @param timestamp The timestamp.
@@ -80,6 +89,43 @@ public String getCause() {
8089
return this.cause;
8190
}
8291

92+
/**
93+
* True if a returned message has been received.
94+
* @return true if there is a return.
95+
* @since 2.2.10
96+
*/
97+
public boolean isReturned() {
98+
return this.returned;
99+
}
100+
101+
/**
102+
* Indicate that a returned message has been received.
103+
* @param isReturned true if there is a return.
104+
* @since 2.2.10
105+
*/
106+
public void setReturned(boolean isReturned) {
107+
this.returned = isReturned;
108+
}
109+
110+
/**
111+
* Return true if a return has been passed to the listener or if no return has been
112+
* received.
113+
* @return false if an expected returned message has not been passed to the listener.
114+
* @throws InterruptedException if interrupted.
115+
* @since 2.2.10
116+
*/
117+
public boolean waitForReturnIfNeeded() throws InterruptedException {
118+
return this.returned ? this.latch.await(RETURN_CALLBACK_TIMEOUT, TimeUnit.SECONDS) : true;
119+
}
120+
121+
/**
122+
* Count down the returned message latch; call after the listener has been called.
123+
* @since 2.2.10
124+
*/
125+
public void countDown() {
126+
this.latch.countDown();
127+
}
128+
83129
@Override
84130
public String toString() {
85131
return "PendingConfirm [correlationData=" + this.correlationData + (this.cause == null ? "" : " cause=" + this.cause) + "]";

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,7 @@
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.ConcurrentMap;
3434
import java.util.concurrent.ConcurrentSkipListMap;
35-
import java.util.concurrent.CountDownLatch;
3635
import java.util.concurrent.ExecutorService;
37-
import java.util.concurrent.TimeUnit;
3836
import java.util.concurrent.TimeoutException;
3937

4038
import org.apache.commons.logging.Log;
@@ -96,8 +94,6 @@ public class PublisherCallbackChannelImpl
9694

9795
private static final MessagePropertiesConverter CONVERTER = new DefaultMessagePropertiesConverter();
9896

99-
private static final long RETURN_CALLBACK_TIMEOUT = 60;
100-
10197
private final Log logger = LogFactory.getLog(this.getClass());
10298

10399
private final Channel delegate;
@@ -112,12 +108,8 @@ public class PublisherCallbackChannelImpl
112108

113109
private final ExecutorService executor;
114110

115-
private final CountDownLatch returnLatch = new CountDownLatch(1);
116-
117111
private volatile java.util.function.Consumer<Channel> afterAckCallback;
118112

119-
private boolean hasReturned;
120-
121113
/**
122114
* Create a {@link PublisherCallbackChannelImpl} instance based on the provided
123115
* delegate and executor.
@@ -1016,9 +1008,9 @@ private void doHandleConfirm(boolean ack, Listener listener, PendingConfirm pend
10161008
this.executor.execute(() -> {
10171009
try {
10181010
if (listener.isConfirmListener()) {
1019-
if (this.hasReturned && !this.returnLatch.await(RETURN_CALLBACK_TIMEOUT, TimeUnit.SECONDS)) {
1020-
this.logger
1021-
.error("Return callback failed to execute in " + RETURN_CALLBACK_TIMEOUT + " seconds");
1011+
if (pendingConfirm.isReturned() && !pendingConfirm.waitForReturnIfNeeded()) {
1012+
this.logger.error("Return callback failed to execute in "
1013+
+ PendingConfirm.RETURN_CALLBACK_TIMEOUT + " seconds");
10221014
}
10231015
if (this.logger.isDebugEnabled()) {
10241016
this.logger.debug("Sending confirm " + pendingConfirm);
@@ -1067,9 +1059,11 @@ public void handleReturn(int replyCode,
10671059
String routingKey,
10681060
AMQP.BasicProperties properties,
10691061
byte[] body) {
1062+
10701063
LongString returnCorrelation = (LongString) properties.getHeaders().get(RETURNED_MESSAGE_CORRELATION_KEY);
1064+
PendingConfirm confirm = null;
10711065
if (returnCorrelation != null) {
1072-
PendingConfirm confirm = this.pendingReturns.remove(returnCorrelation.toString());
1066+
confirm = this.pendingReturns.remove(returnCorrelation.toString());
10731067
if (confirm != null) {
10741068
MessageProperties messageProperties = CONVERTER.toMessageProperties(properties,
10751069
new Envelope(0L, false, exchange, routingKey), StandardCharsets.UTF_8.name());
@@ -1096,8 +1090,11 @@ public void handleReturn(int replyCode,
10961090
}
10971091
}
10981092
else {
1099-
this.hasReturned = true;
1093+
if (confirm != null) {
1094+
confirm.setReturned(true);
1095+
}
11001096
Listener listenerToInvoke = listener;
1097+
PendingConfirm toCountDown = confirm;
11011098
this.executor.execute(() -> {
11021099
try {
11031100
listenerToInvoke.handleReturn(replyCode, replyText, exchange, routingKey, properties, body);
@@ -1106,7 +1103,9 @@ public void handleReturn(int replyCode,
11061103
this.logger.error("Exception delivering returned message ", e);
11071104
}
11081105
finally {
1109-
this.returnLatch.countDown();
1106+
if (toCountDown != null) {
1107+
toCountDown.countDown();
1108+
}
11101109
}
11111110
});
11121111
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelTests.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,20 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.BDDMockito.given;
2122
import static org.mockito.BDDMockito.willAnswer;
2223
import static org.mockito.Mockito.mock;
2324

2425
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.Collections;
28+
import java.util.HashMap;
29+
import java.util.List;
2530
import java.util.Properties;
31+
import java.util.UUID;
2632
import java.util.concurrent.CountDownLatch;
2733
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Executors;
2835
import java.util.concurrent.TimeUnit;
2936
import java.util.concurrent.TimeoutException;
3037
import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,12 +40,15 @@
3340
import org.junit.jupiter.api.Test;
3441

3542
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
43+
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel.Listener;
3644
import org.springframework.amqp.rabbit.core.RabbitTemplate;
3745
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
3846
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
3947
import org.springframework.core.task.SimpleAsyncTaskExecutor;
4048

49+
import com.rabbitmq.client.AMQP.BasicProperties;
4150
import com.rabbitmq.client.Channel;
51+
import com.rabbitmq.client.LongString;
4252
import com.rabbitmq.client.Method;
4353
import com.rabbitmq.client.ShutdownListener;
4454
import com.rabbitmq.client.ShutdownSignalException;
@@ -138,4 +148,82 @@ void testNotCached() throws Exception {
138148
assertThat(cacheProperties.getProperty("idleChannelsNotTx")).isEqualTo("2");
139149
}
140150

151+
@Test
152+
void confirmAlwaysAfterReturn() throws InterruptedException {
153+
Channel delegate = mock(Channel.class);
154+
ExecutorService executor = Executors.newCachedThreadPool();
155+
PublisherCallbackChannelImpl channel = new PublisherCallbackChannelImpl(delegate, executor);
156+
TheListener listener = new TheListener();
157+
channel.addListener(listener);
158+
channel.addPendingConfirm(listener, 1, new PendingConfirm(new CorrelationData("1"), 0L));
159+
HashMap<String, Object> headers = new HashMap<>();
160+
LongString correlation = mock(LongString.class);
161+
given(correlation.getBytes()).willReturn("1".getBytes());
162+
given(correlation.toString()).willReturn("1");
163+
headers.put(PublisherCallbackChannelImpl.RETURNED_MESSAGE_CORRELATION_KEY, correlation);
164+
headers.put(PublisherCallbackChannelImpl.RETURN_LISTENER_CORRELATION_KEY, listener.getUUID());
165+
BasicProperties properties = new BasicProperties.Builder().headers(headers).build();
166+
channel.handleReturn(0, "", "", "", properties, new byte[0]);
167+
channel.handleAck(1, false);
168+
assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
169+
channel.addPendingConfirm(listener, 2, new PendingConfirm(new CorrelationData("1"), 0L));
170+
channel.handleReturn(0, "", "", "", properties, new byte[0]);
171+
channel.handleAck(2, false);
172+
assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
173+
assertThat(listener.calls).containsExactly("return", "confirm", "return", "confirm");
174+
}
175+
176+
private static class TheListener implements Listener {
177+
178+
private final UUID uuid = UUID.randomUUID();
179+
180+
final List<String> calls = Collections.synchronizedList(new ArrayList<>());
181+
182+
final CountDownLatch latch1 = new CountDownLatch(2);
183+
184+
final CountDownLatch latch2 = new CountDownLatch(4);
185+
186+
@Override
187+
public void handleConfirm(PendingConfirm pendingConfirm, boolean ack) {
188+
this.calls.add("confirm");
189+
this.latch1.countDown();
190+
this.latch2.countDown();
191+
}
192+
193+
@Override
194+
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
195+
BasicProperties properties, byte[] body) throws IOException {
196+
197+
try {
198+
Thread.sleep(500);
199+
}
200+
catch (InterruptedException e) {
201+
Thread.currentThread().interrupt();
202+
}
203+
this.calls.add("return");
204+
this.latch1.countDown();
205+
this.latch2.countDown();
206+
}
207+
208+
@Override
209+
public void revoke(Channel channel) {
210+
}
211+
212+
@Override
213+
public String getUUID() {
214+
return this.uuid.toString();
215+
}
216+
217+
@Override
218+
public boolean isConfirmListener() {
219+
return true;
220+
}
221+
222+
@Override
223+
public boolean isReturnListener() {
224+
return true;
225+
}
226+
227+
}
228+
141229
}

0 commit comments

Comments
 (0)