Skip to content

Commit 5d73342

Browse files
garyrussellartembilan
authored andcommitted
GH-1038: RT: More evaluateFastReplyTo Fixes
Fixes #1038 The previous fix to catch `AmqpConnectException` was incorrect - the `ShutdownSignalException` for the passive queue declaration failure is wrapped in an `AmqpConnectException` so we would have failed to detect the failure. Also, it has been reported that sometimes an `AmqpIOException` is thrown. - Add `AmqpIOException` to the catch block - Search for, and explitly check for the queue declaration failed exception - For all other cases rethrow so we can test again **cherry-pick to all supported** * * Fix log messages
1 parent 24c63e1 commit 5d73342

File tree

2 files changed

+76
-9
lines changed

2 files changed

+76
-9
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import org.springframework.amqp.AmqpConnectException;
4040
import org.springframework.amqp.AmqpException;
41+
import org.springframework.amqp.AmqpIOException;
4142
import org.springframework.amqp.AmqpIllegalStateException;
4243
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
4344
import org.springframework.amqp.core.Address;
@@ -103,6 +104,7 @@
103104
import com.rabbitmq.client.Envelope;
104105
import com.rabbitmq.client.GetResponse;
105106
import com.rabbitmq.client.ShutdownListener;
107+
import com.rabbitmq.client.ShutdownSignalException;
106108

107109
/**
108110
* <p>
@@ -951,18 +953,25 @@ protected boolean useDirectReplyTo() {
951953
return true;
952954
});
953955
}
954-
catch (AmqpConnectException ex) {
955-
if (logger.isDebugEnabled()) {
956-
logger.debug("Connection error, deferring directReplyTo detection");
956+
catch (AmqpConnectException | AmqpIOException ex) {
957+
Throwable cause = ex;
958+
while (cause != null && !(cause instanceof ShutdownSignalException)) {
959+
cause = cause.getCause();
960+
}
961+
if (cause instanceof ShutdownSignalException) {
962+
if (RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException) cause)) {
963+
if (logger.isWarnEnabled()) {
964+
logger.warn("Broker does not support fast replies via 'amq.rabbitmq.reply-to', temporary "
965+
+ "queues will be used: " + cause.getMessage() + ".");
966+
}
967+
this.replyAddress = null;
968+
return false;
969+
}
957970
}
958-
throw ex;
959-
}
960-
catch (Exception e) {
961971
if (logger.isDebugEnabled()) {
962-
logger.warn("Broker does not support fast replies via 'amq.rabbitmq.reply-to', temporary "
963-
+ "queues will be used:" + e.getMessage() + ".");
972+
logger.debug("IO error, deferring directReplyTo detection: " + ex.toString());
964973
}
965-
this.replyAddress = null;
974+
throw ex;
966975
}
967976
}
968977
return false;

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.springframework.amqp.AmqpAuthenticationException;
5252
import org.springframework.amqp.AmqpConnectException;
5353
import org.springframework.amqp.AmqpException;
54+
import org.springframework.amqp.AmqpIOException;
5455
import org.springframework.amqp.core.Address;
5556
import org.springframework.amqp.core.Message;
5657
import org.springframework.amqp.core.MessagePostProcessor;
@@ -61,6 +62,7 @@
6162
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
6263
import org.springframework.amqp.rabbit.connection.ChannelProxy;
6364
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel;
65+
import org.springframework.amqp.rabbit.connection.RabbitUtils;
6466
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
6567
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
6668
import org.springframework.amqp.support.converter.SimpleMessageConverter;
@@ -241,6 +243,62 @@ public void testEvaluateDirectReplyToWithConnectException() throws Exception {
241243
assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isFalse();
242244
}
243245

246+
@Test
247+
public void testEvaluateDirectReplyToWithIOException() throws Exception {
248+
org.springframework.amqp.rabbit.connection.ConnectionFactory mockConnectionFactory =
249+
mock(org.springframework.amqp.rabbit.connection.ConnectionFactory.class);
250+
willThrow(new AmqpIOException(null)).given(mockConnectionFactory).createConnection();
251+
RabbitTemplate template = new RabbitTemplate(mockConnectionFactory);
252+
assertThatThrownBy(() -> template.convertSendAndReceive("foo")).isInstanceOf(AmqpIOException.class);
253+
assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isFalse();
254+
}
255+
256+
@Test
257+
public void testEvaluateDirectReplyToWithIOExceptionDeclareFailed() throws Exception {
258+
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
259+
Connection mockConnection = mock(Connection.class);
260+
Channel mockChannel = mock(Channel.class);
261+
262+
given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
263+
given(mockConnection.isOpen()).willReturn(true);
264+
given(mockConnection.createChannel()).willReturn(mockChannel);
265+
AMQP.Channel.Close mockMethod = mock(AMQP.Channel.Close.class);
266+
given(mockMethod.getReplyCode()).willReturn(AMQP.NOT_FOUND);
267+
given(mockMethod.getClassId()).willReturn(RabbitUtils.QUEUE_CLASS_ID_50);
268+
given(mockMethod.getMethodId()).willReturn(RabbitUtils.DECLARE_METHOD_ID_10);
269+
willThrow(new ShutdownSignalException(true, false, mockMethod, null)).given(mockChannel)
270+
.queueDeclarePassive(Address.AMQ_RABBITMQ_REPLY_TO);
271+
given(mockChannel.queueDeclare()).willReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
272+
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(mockConnectionFactory);
273+
connectionFactory.setExecutor(mock(ExecutorService.class));
274+
RabbitTemplate template = new RabbitTemplate(connectionFactory);
275+
template.setReplyTimeout(1);
276+
template.convertSendAndReceive("foo");
277+
assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isTrue();
278+
assertThat(TestUtils.getPropertyValue(template, "usingFastReplyTo", Boolean.class)).isFalse();
279+
}
280+
281+
@Test
282+
public void testEvaluateDirectReplyToOK() throws Exception {
283+
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
284+
Connection mockConnection = mock(Connection.class);
285+
Channel mockChannel = mock(Channel.class);
286+
given(mockChannel.isOpen()).willReturn(true);
287+
288+
given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
289+
given(mockConnection.isOpen()).willReturn(true);
290+
given(mockConnection.createChannel()).willReturn(mockChannel);
291+
given(mockChannel.queueDeclarePassive(Address.AMQ_RABBITMQ_REPLY_TO))
292+
.willReturn(new AMQImpl.Queue.DeclareOk(Address.AMQ_RABBITMQ_REPLY_TO, 0, 0));
293+
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(mockConnectionFactory);
294+
connectionFactory.setExecutor(mock(ExecutorService.class));
295+
RabbitTemplate template = new RabbitTemplate(connectionFactory);
296+
template.setReplyTimeout(1);
297+
template.convertSendAndReceive("foo");
298+
assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isTrue();
299+
assertThat(TestUtils.getPropertyValue(template, "usingFastReplyTo", Boolean.class)).isTrue();
300+
}
301+
244302
@Test
245303
public void testRecovery() throws Exception {
246304
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);

0 commit comments

Comments
 (0)