Skip to content

Commit 2e99acd

Browse files
garyrussellartembilan
authored andcommitted
AMQP-815: Fix typo in doc for ConnectionFB
JIRA: https://jira.spring.io/browse/AMQP-815 AMQP-797: Defer caching publisher callback channel JIRA: https://jira.spring.io/browse/AMQP-797 Defer caching publisher callback channels until acks received. - another user might perform an erroneous call that forces the channel closed and the confirmations will be lost. - if the factory is destroyed with channels in that state, force them closed to generate nacks Fix package tangle; use Channel instead of ChannelProxy in API; docs to follow. More polishing - map channel to proxy to simplify interaction. * Fix some typos and code style
1 parent 90bac89 commit 2e99acd

File tree

11 files changed

+325
-98
lines changed

11 files changed

+325
-98
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 92 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.lang.reflect.Proxy;
2424
import java.net.URI;
2525
import java.util.Arrays;
26+
import java.util.Collection;
2627
import java.util.HashMap;
2728
import java.util.HashSet;
2829
import java.util.LinkedList;
@@ -32,6 +33,8 @@
3233
import java.util.Properties;
3334
import java.util.Set;
3435
import java.util.concurrent.BlockingDeque;
36+
import java.util.concurrent.ConcurrentHashMap;
37+
import java.util.concurrent.ConcurrentMap;
3538
import java.util.concurrent.ExecutorService;
3639
import java.util.concurrent.Executors;
3740
import java.util.concurrent.LinkedBlockingDeque;
@@ -92,7 +95,7 @@
9295
*/
9396
@ManagedResource
9497
public class CachingConnectionFactory extends AbstractConnectionFactory
95-
implements InitializingBean, ShutdownListener, PublisherCallbackChannelConnectionFactory {
98+
implements InitializingBean, ShutdownListener {
9699

97100
private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;
98101

@@ -123,10 +126,10 @@ public enum CacheMode {
123126
private final Set<ChannelCachingConnectionProxy> allocatedConnections = new HashSet<>();
124127

125128
private final Map<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>>
126-
allocatedConnectionNonTransactionalChannels = new HashMap<>();
129+
allocatedConnectionNonTransactionalChannels = new HashMap<>();
127130

128131
private final Map<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>>
129-
allocatedConnectionTransactionalChannels = new HashMap<>();
132+
allocatedConnectionTransactionalChannels = new HashMap<>();
130133

131134
private final BlockingDeque<ChannelCachingConnectionProxy> idleConnections = new LinkedBlockingDeque<>();
132135

@@ -245,9 +248,9 @@ private CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitCon
245248
if (!isPublisherFactory) {
246249
if (rabbitConnectionFactory.isAutomaticRecoveryEnabled()) {
247250
logger.warn("***\nAutomatic Recovery is Enabled in the provided connection factory;\n"
248-
+ "while Spring AMQP is compatible with this feature, it\n"
249-
+ "prefers to use its own recovery mechanisms; when this option is true, you may receive\n"
250-
+ "'AutoRecoverConnectionNotCurrentlyOpenException's until the connection is recovered.");
251+
+ "while Spring AMQP is compatible with this feature, it\n"
252+
+ "prefers to use its own recovery mechanisms; when this option is true, you may receive\n"
253+
+ "'AutoRecoverConnectionNotCurrentlyOpenException's until the connection is recovered.");
251254
}
252255
this.publisherConnectionFactory = new CachingConnectionFactory(getRabbitConnectionFactory(),
253256
true);
@@ -471,7 +474,7 @@ private Channel getChannel(ChannelCachingConnectionProxy connection, boolean tra
471474
}
472475
if (logger.isDebugEnabled()) {
473476
logger.debug(
474-
"Acquired permit for " + connection + ", remaining:" + checkoutPermits.availablePermits());
477+
"Acquired permit for " + connection + ", remaining:" + checkoutPermits.availablePermits());
475478
}
476479
}
477480
catch (InterruptedException e) {
@@ -551,7 +554,7 @@ private Channel getChannel(ChannelCachingConnectionProxy connection, boolean tra
551554
checkoutPermits.release();
552555
if (logger.isDebugEnabled()) {
553556
logger.debug("Could not get channel; released permit for " + connection + ", remaining:"
554-
+ checkoutPermits.availablePermits());
557+
+ checkoutPermits.availablePermits());
555558
}
556559
}
557560
throw e;
@@ -792,31 +795,33 @@ public void resetConnection() {
792795
/*
793796
* Reset the Channel cache and underlying shared Connection, to be reinitialized on next access.
794797
*/
795-
protected void reset(List<ChannelProxy> channels, List<ChannelProxy> txChannels) {
798+
protected void reset(List<ChannelProxy> channels, List<ChannelProxy> txChannels,
799+
Map<Channel, ChannelProxy> channelsAwaitingAcks) {
800+
796801
this.active = false;
797-
synchronized (channels) {
798-
for (ChannelProxy channel : channels) {
799-
try {
800-
channel.close();
801-
}
802-
catch (Exception ex) {
803-
logger.trace("Could not close cached Rabbit Channel", ex);
804-
}
805-
}
806-
channels.clear();
802+
closeAndClear(channels);
803+
closeAndClear(txChannels);
804+
closeChannels(channelsAwaitingAcks.values());
805+
channelsAwaitingAcks.clear();
806+
this.active = true;
807+
}
808+
809+
protected void closeAndClear(Collection<ChannelProxy> theChannels) {
810+
synchronized (theChannels) {
811+
closeChannels(theChannels);
812+
theChannels.clear();
807813
}
808-
synchronized (txChannels) {
809-
for (ChannelProxy channel : txChannels) {
810-
try {
811-
channel.close();
812-
}
813-
catch (Exception ex) {
814-
logger.trace("Could not close cached Rabbit Channel", ex);
815-
}
814+
}
815+
816+
protected void closeChannels(Collection<ChannelProxy> theChannels) {
817+
for (ChannelProxy channel : theChannels) {
818+
try {
819+
channel.close();
820+
}
821+
catch (Exception ex) {
822+
logger.trace("Could not close cached Rabbit Channel", ex);
816823
}
817-
txChannels.clear();
818824
}
819-
this.active = true;
820825
}
821826

822827
@ManagedAttribute
@@ -829,12 +834,12 @@ public Properties getCacheProperties() {
829834
props.setProperty("connectionCacheSize", Integer.toString(this.connectionCacheSize));
830835
props.setProperty("openConnections", Integer.toString(countOpenConnections()));
831836
props.setProperty("idleConnections", Integer.toString(this.idleConnections.size()));
832-
props.setProperty("idleConnectionsHighWater", Integer.toString(this.connectionHighWaterMark.get()));
837+
props.setProperty("idleConnectionsHighWater", Integer.toString(this.connectionHighWaterMark.get()));
833838
for (ChannelCachingConnectionProxy proxy : this.allocatedConnections) {
834839
putConnectionName(props, proxy, ":" + proxy.getLocalPort());
835840
}
836841
for (Entry<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>> entry :
837-
this.allocatedConnectionTransactionalChannels.entrySet()) {
842+
this.allocatedConnectionTransactionalChannels.entrySet()) {
838843
int port = entry.getKey().getLocalPort();
839844
if (port > 0 && entry.getKey().isOpen()) {
840845
LinkedList<ChannelProxy> channelList = entry.getValue();
@@ -844,7 +849,7 @@ public Properties getCacheProperties() {
844849
}
845850
}
846851
for (Entry<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>> entry :
847-
this.allocatedConnectionNonTransactionalChannels.entrySet()) {
852+
this.allocatedConnectionNonTransactionalChannels.entrySet()) {
848853
int port = entry.getKey().getLocalPort();
849854
if (port > 0 && entry.getKey().isOpen()) {
850855
LinkedList<ChannelProxy> channelList = entry.getValue();
@@ -921,7 +926,9 @@ private final class CachedChannelInvocationHandler implements InvocationHandler
921926

922927
private final boolean transactional;
923928

924-
private volatile boolean confirmSelected = CachingConnectionFactory.this.simplePublisherConfirms;
929+
private final boolean confirmSelected = CachingConnectionFactory.this.simplePublisherConfirms;
930+
931+
private final boolean publisherConfirms = CachingConnectionFactory.this.publisherConfirms;
925932

926933
private volatile Channel target;
927934

@@ -959,7 +966,7 @@ else if (methodName.equals("close")) {
959966
// Handle close method: don't pass the call on.
960967
if (CachingConnectionFactory.this.active) {
961968
synchronized (this.channelList) {
962-
if (!RabbitUtils.isPhysicalCloseRequired() &&
969+
if (CachingConnectionFactory.this.active && !RabbitUtils.isPhysicalCloseRequired() &&
963970
(this.channelList.size() < getChannelCacheSize()
964971
|| this.channelList.contains(proxy))) {
965972
releasePermitIfNecessary(proxy);
@@ -1056,7 +1063,7 @@ private void releasePermitIfNecessary(Object proxy) {
10561063
checkoutPermits.release();
10571064
if (logger.isDebugEnabled()) {
10581065
logger.debug("Released permit for '" + this.theConnection + "', remaining: "
1059-
+ checkoutPermits.availablePermits());
1066+
+ checkoutPermits.availablePermits());
10601067
}
10611068
}
10621069
else {
@@ -1088,13 +1095,42 @@ private void logicalClose(ChannelProxy proxy) throws Exception {
10881095
}
10891096
}
10901097
}
1091-
// Allow for multiple close calls...
1092-
if (!this.channelList.contains(proxy)) {
1093-
if (logger.isTraceEnabled()) {
1094-
logger.trace("Returning cached Channel: " + this.target);
1098+
if (CachingConnectionFactory.this.active && this.publisherConfirms
1099+
&& proxy instanceof PublisherCallbackChannel) {
1100+
1101+
this.theConnection.channelsAwaitingAcks.put(this.target, proxy);
1102+
((PublisherCallbackChannel) proxy)
1103+
.setAfterAckCallback(c ->
1104+
returnToCache(this.theConnection.channelsAwaitingAcks.remove(c)));
1105+
}
1106+
else {
1107+
returnToCache(proxy);
1108+
}
1109+
}
1110+
1111+
private void returnToCache(Channel proxy) {
1112+
if (proxy != null) {
1113+
synchronized (this.channelList) {
1114+
// Allow for multiple close calls...
1115+
if (CachingConnectionFactory.this.active) {
1116+
if (!this.channelList.contains(proxy)) {
1117+
if (logger.isTraceEnabled()) {
1118+
logger.trace("Returning cached Channel: " + this.target);
1119+
}
1120+
this.channelList.addLast((ChannelProxy) proxy);
1121+
setHighWaterMark();
1122+
}
1123+
}
1124+
else {
1125+
if (proxy.isOpen()) {
1126+
try {
1127+
physicalClose();
1128+
}
1129+
catch (Exception e) {
1130+
}
1131+
}
1132+
}
10951133
}
1096-
this.channelList.addLast(proxy);
1097-
setHighWaterMark();
10981134
}
10991135
}
11001136

@@ -1154,14 +1190,18 @@ private void asyncClose() {
11541190
catch (InterruptedException e1) {
11551191
Thread.currentThread().interrupt();
11561192
}
1157-
catch (Exception e2) { }
1193+
catch (Exception e2) {
1194+
}
11581195
finally {
11591196
try {
11601197
channel.close();
11611198
}
1162-
catch (IOException e3) { }
1163-
catch (AlreadyClosedException e4) { }
1164-
catch (TimeoutException e5) { }
1199+
catch (IOException e3) {
1200+
}
1201+
catch (AlreadyClosedException e4) {
1202+
}
1203+
catch (TimeoutException e5) {
1204+
}
11651205
catch (ShutdownSignalException e6) {
11661206
if (!RabbitUtils.isNormalShutdown(e6)) {
11671207
logger.debug("Unexpected exception on deferred close", e6);
@@ -1177,6 +1217,8 @@ private class ChannelCachingConnectionProxy implements ConnectionProxy { // NOSO
11771217

11781218
private final AtomicBoolean closeNotified = new AtomicBoolean(false);
11791219

1220+
private final ConcurrentMap<Channel, ChannelProxy> channelsAwaitingAcks = new ConcurrentHashMap<>();
1221+
11801222
private volatile Connection target;
11811223

11821224
ChannelCachingConnectionProxy(Connection target) {
@@ -1248,11 +1290,12 @@ private int countOpenIdleConnections() {
12481290
public void destroy() {
12491291
if (CachingConnectionFactory.this.cacheMode == CacheMode.CHANNEL) {
12501292
reset(CachingConnectionFactory.this.cachedChannelsNonTransactional,
1251-
CachingConnectionFactory.this.cachedChannelsTransactional);
1293+
CachingConnectionFactory.this.cachedChannelsTransactional, this.channelsAwaitingAcks);
12521294
}
12531295
else {
12541296
reset(CachingConnectionFactory.this.allocatedConnectionNonTransactionalChannels.get(this),
1255-
CachingConnectionFactory.this.allocatedConnectionTransactionalChannels.get(this));
1297+
CachingConnectionFactory.this.allocatedConnectionTransactionalChannels.get(this),
1298+
this.channelsAwaitingAcks);
12561299
}
12571300
if (this.target != null) {
12581301
RabbitUtils.closeConnection(this.target);
@@ -1289,8 +1332,8 @@ public int getLocalPort() {
12891332
@Override
12901333
public String toString() {
12911334
return "Proxy@" + ObjectUtils.getIdentityHexString(this) + " "
1292-
+ (CachingConnectionFactory.this.cacheMode == CacheMode.CHANNEL ? "Shared " : "Dedicated ")
1293-
+ "Rabbit Connection: " + this.target;
1335+
+ (CachingConnectionFactory.this.cacheMode == CacheMode.CHANNEL ? "Shared " : "Dedicated ")
1336+
+ "Rabbit Connection: " + this.target;
12941337
}
12951338

12961339
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactory.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,22 @@ default boolean isSimplePublisherConfirms() {
6565
return false;
6666
}
6767

68+
/**
69+
* Return true if publisher confirms are enabled.
70+
* @return publisherConfirms.
71+
* @since 2.1
72+
*/
73+
default boolean isPublisherConfirms() {
74+
return false;
75+
}
76+
77+
/**
78+
* Return true if publisher returns are enabled.
79+
* @return publisherReturns.
80+
* @since 2.1
81+
*/
82+
default boolean isPublisherReturns() {
83+
return false;
84+
}
85+
6886
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelConnectionFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2016 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,11 +20,13 @@
2020
* Connection factories implementing this interface return a connection that
2121
* provides {@code PublisherCallbackChannel} channel instances when confirms
2222
* or returns are enabled.
23+
* @deprecated in favor of default methods on ConnectionFactory.
2324
*
2425
* @author Gary Russell
2526
* @since 1.5
2627
*
2728
*/
29+
@Deprecated
2830
public interface PublisherCallbackChannelConnectionFactory {
2931

3032
/**

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.springframework.amqp.rabbit.connection.Connection;
5555
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
5656
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
57-
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannelConnectionFactory;
5857
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
5958
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
6059
import org.springframework.amqp.rabbit.connection.RabbitUtils;
@@ -86,6 +85,7 @@
8685
import org.springframework.expression.Expression;
8786
import org.springframework.expression.spel.standard.SpelExpressionParser;
8887
import org.springframework.expression.spel.support.StandardEvaluationContext;
88+
import org.springframework.lang.Nullable;
8989
import org.springframework.retry.RecoveryCallback;
9090
import org.springframework.retry.RetryCallback;
9191
import org.springframework.retry.support.RetryTemplate;
@@ -1842,8 +1842,8 @@ private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionF
18421842
}
18431843

18441844
@Override
1845-
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
1846-
com.rabbitmq.client.ConfirmCallback nacks) {
1845+
public <T> T invoke(OperationsCallback<T> action, @Nullable com.rabbitmq.client.ConfirmCallback acks,
1846+
@Nullable com.rabbitmq.client.ConfirmCallback nacks) {
18471847

18481848
final Channel currentChannel = this.dedicatedChannels.get();
18491849
Assert.state(currentChannel == null, () -> "Nested invoke() calls are not supported; channel '" + currentChannel
@@ -1870,7 +1870,9 @@ public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCal
18701870
if (channel == null) {
18711871
throw new IllegalStateException("Connection returned a null channel");
18721872
}
1873-
RabbitUtils.setPhysicalCloseRequired(channel, true);
1873+
if (!getConnectionFactory().isPublisherConfirms()) {
1874+
RabbitUtils.setPhysicalCloseRequired(channel, true);
1875+
}
18741876
this.dedicatedChannels.set(channel);
18751877
}
18761878
catch (RuntimeException e) {
@@ -1935,14 +1937,8 @@ public void waitForConfirmsOrDie(long timeout) {
19351937
}
19361938

19371939
public void determineConfirmsReturnsCapability(ConnectionFactory connectionFactory) {
1938-
if (connectionFactory instanceof PublisherCallbackChannelConnectionFactory) {
1939-
PublisherCallbackChannelConnectionFactory pcccf =
1940-
(PublisherCallbackChannelConnectionFactory) connectionFactory;
1941-
this.confirmsOrReturnsCapable = pcccf.isPublisherConfirms() || pcccf.isPublisherReturns();
1942-
}
1943-
else {
1944-
this.confirmsOrReturnsCapable = Boolean.FALSE;
1945-
}
1940+
this.confirmsOrReturnsCapable =
1941+
connectionFactory.isPublisherConfirms() || connectionFactory.isPublisherReturns();
19461942
}
19471943

19481944
/**

0 commit comments

Comments
 (0)