Skip to content

Commit 8a38e65

Browse files
garyrussellartembilan
authored andcommitted
AMQP-817: Synthetic NACKs on separate thread
JIRA: https://jira.spring.io/browse/AMQP-817 Previously, synthetic nacks were processed on the client thread, which caused a deadlock if the user attempted some operation such as creating a new channel on that thread. Also, when generated from a application-level `close()` a similar deadlock between the connection factory and `PublisherCallbackChannelImpl` could occur. Synthetic nacks are now always processed on a separate thread.
1 parent b40019b commit 8a38e65

File tree

4 files changed

+39
-16
lines changed

4 files changed

+39
-16
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ private Channel doCreateBareChannel(ChannelCachingConnectionProxy connection, bo
623623
}
624624
if (this.publisherConfirms || this.publisherReturns) {
625625
if (!(channel instanceof PublisherCallbackChannelImpl)) {
626-
channel = new PublisherCallbackChannelImpl(channel);
626+
channel = new PublisherCallbackChannelImpl(channel, getExecutorService());
627627
}
628628
}
629629
if (channel != null) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/PublisherCallbackChannelImpl.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.util.concurrent.ConcurrentHashMap;
3232
import java.util.concurrent.ConcurrentMap;
3333
import java.util.concurrent.ConcurrentSkipListMap;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
3436
import java.util.concurrent.TimeoutException;
3537

3638
import org.apache.commons.logging.Log;
@@ -79,6 +81,8 @@
7981
public class PublisherCallbackChannelImpl
8082
implements PublisherCallbackChannel, ConfirmListener, ReturnListener, ShutdownListener {
8183

84+
private static final ExecutorService DEFAULT_EXECUTOR = Executors.newSingleThreadExecutor();
85+
8286
private final Log logger = LogFactory.getLog(this.getClass());
8387

8488
private final Channel delegate;
@@ -91,9 +95,16 @@ public class PublisherCallbackChannelImpl
9195

9296
private volatile java.util.function.Consumer<Channel> afterAckCallback;
9397

98+
private final ExecutorService executor;
99+
94100
public PublisherCallbackChannelImpl(Channel delegate) {
101+
this(delegate, null);
102+
}
103+
104+
public PublisherCallbackChannelImpl(Channel delegate, ExecutorService executor) {
95105
delegate.addShutdownListener(this);
96106
this.delegate = delegate;
107+
this.executor = executor != null ? executor : DEFAULT_EXECUTOR;
97108
}
98109

99110
@Override
@@ -781,7 +792,11 @@ public void close() throws IOException, TimeoutException {
781792
this.logger.trace(this.delegate + " is already closed");
782793
}
783794
}
784-
generateNacksForPendingAcks("Channel closed by application");
795+
shutdownCompleted("Channel closed by application");
796+
}
797+
798+
private void shutdownCompleted(String cause) {
799+
this.executor.execute(() -> generateNacksForPendingAcks(cause));
785800
}
786801

787802
private synchronized void generateNacksForPendingAcks(String cause) {
@@ -997,7 +1012,7 @@ public void handleReturn(int replyCode,
9971012

9981013
@Override
9991014
public void shutdownCompleted(ShutdownSignalException cause) {
1000-
generateNacksForPendingAcks(cause.getMessage());
1015+
shutdownCompleted(cause.getMessage());
10011016
}
10021017

10031018
// Object

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,10 @@ public void testPublisherConfirmNotReceivedMultiThreads() throws Exception {
401401
exec.shutdown();
402402
assertTrue(exec.awaitTermination(10, TimeUnit.SECONDS));
403403
ccf.destroy();
404+
int n = 0;
405+
while (n++ < 100 && pendingConfirms.size() > 0) {
406+
Thread.sleep(100);
407+
}
404408
assertEquals(0, pendingConfirms.size());
405409
}
406410

@@ -739,7 +743,7 @@ private void testPublisherConfirmCloseConcurrency(final int closeAfter) throws E
739743
when(mockConnection.createChannel()).thenReturn(mockChannel1, mockChannel2);
740744

741745
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
742-
ccf.setExecutor(mock(ExecutorService.class));
746+
ccf.setExecutor(Executors.newSingleThreadExecutor());
743747
ccf.setPublisherConfirms(true);
744748
final RabbitTemplate template = new RabbitTemplate(ccf);
745749

@@ -768,13 +772,12 @@ private void testPublisherConfirmCloseConcurrency(final int closeAfter) throws E
768772
@Test
769773
public void testPublisherCallbackChannelImplCloseWithPending() throws Exception {
770774

771-
final AtomicInteger nacks = new AtomicInteger();
772-
773775
Listener listener = mock(Listener.class);
776+
final CountDownLatch latch = new CountDownLatch(2);
774777
doAnswer(invocation -> {
775778
boolean ack = invocation.getArgument(1);
776779
if (!ack) {
777-
nacks.incrementAndGet();
780+
latch.countDown();
778781
}
779782
return null;
780783
}).when(listener).handleConfirm(any(PendingConfirm.class), anyBoolean());
@@ -795,8 +798,13 @@ public void testPublisherCallbackChannelImplCloseWithPending() throws Exception
795798

796799
channel.close();
797800

801+
assertTrue(latch.await(10, TimeUnit.SECONDS));
802+
803+
int n = 0;
804+
while (n++ < 100 && TestUtils.getPropertyValue(channel, "pendingConfirms", Map.class).size() > 0) {
805+
Thread.sleep(100);
806+
}
798807
assertEquals(0, TestUtils.getPropertyValue(channel, "pendingConfirms", Map.class).size());
799-
assertEquals(2, nacks.get());
800808

801809
}
802810

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests3.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.AtomicInteger;
2525

26-
import org.junit.jupiter.api.Disabled;
2726
import org.junit.jupiter.api.Test;
2827

2928
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
@@ -37,17 +36,18 @@
3736

3837
/**
3938
* @author Gary Russell
40-
*
4139
* @since 2.1
4240
*
4341
*/
44-
@RabbitAvailable(queues = RabbitTemplatePublisherCallbacksIntegrationTests3.QUEUE)
42+
@RabbitAvailable(queues = { RabbitTemplatePublisherCallbacksIntegrationTests3.QUEUE1,
43+
RabbitTemplatePublisherCallbacksIntegrationTests3.QUEUE2 })
4544
public class RabbitTemplatePublisherCallbacksIntegrationTests3 {
4645

47-
public static final String QUEUE = "defer.close";
46+
public static final String QUEUE1 = "synthetic.nack";
47+
48+
public static final String QUEUE2 = "defer.close";
4849

4950
@Test
50-
@Disabled
5151
public void testRepublishOnNackThreadNoExchange() throws Exception {
5252
CachingConnectionFactory cf = new CachingConnectionFactory(
5353
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
@@ -56,13 +56,13 @@ public void testRepublishOnNackThreadNoExchange() throws Exception {
5656
final CountDownLatch confirmLatch = new CountDownLatch(2);
5757
template.setConfirmCallback((cd, a, c) -> {
5858
if (confirmLatch.getCount() == 2) {
59-
template.convertAndSend(QUEUE, ((MyCD) cd).payload); // deadlock creating new channel
59+
template.convertAndSend(QUEUE1, ((MyCD) cd).payload);
6060
}
6161
confirmLatch.countDown();
6262
});
6363
template.convertAndSend("bad.exchange", "junk", "foo", new MyCD("foo"));
6464
assertThat(confirmLatch.await(10, TimeUnit.SECONDS)).isTrue();
65-
assertThat(template.receive(QUEUE, 10_000)).isNotNull();
65+
assertThat(template.receive(QUEUE1, 10_000)).isNotNull();
6666
}
6767

6868
@Test
@@ -90,7 +90,7 @@ public void testDeferredChannelCache() throws Exception {
9090
channel2.close();
9191
conn.close();
9292
assertThat(TestUtils.getPropertyValue(cf, "cachedChannelsNonTransactional", List.class).size()).isEqualTo(2);
93-
template.convertAndSend("", QUEUE + "junk", "foo", new MyCD("foo"));
93+
template.convertAndSend("", QUEUE2 + "junk", "foo", new MyCD("foo"));
9494
assertThat(returnLatch.await(10, TimeUnit.SECONDS)).isTrue();
9595
assertThat(confirmLatch.await(10, TimeUnit.SECONDS)).isTrue();
9696
assertThat(cacheCount.get()).isEqualTo(1);

0 commit comments

Comments
 (0)