Skip to content

Commit ee75eaf

Browse files
garyrussellartembilan
authored andcommitted
Close auto-recovering connection
When a connection is auto-recovered, the `RabbitAdmin` is not invoked to re-declare auto-delete queues because the connection listeners are not invoked. Close an auto-recovered connection before it is recovered. Tested with a stand-alone spring-cloud-bus application. **Cherry-pick to 2.0.x, 1.7.x**
1 parent 4165542 commit ee75eaf

File tree

2 files changed

+20
-114
lines changed

2 files changed

+20
-114
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,26 @@ protected final Connection createBareConnection() {
467467
}
468468

469469
Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout);
470+
if (rabbitConnection instanceof AutorecoveringConnection) {
471+
((AutorecoveringConnection) rabbitConnection).addRecoveryListener(new RecoveryListener() {
472+
473+
@Override
474+
public void handleRecoveryStarted(Recoverable recoverable) {
475+
handleRecovery(recoverable);
476+
}
477+
478+
@Override
479+
public void handleRecovery(Recoverable recoverable) {
480+
try {
481+
connection.close();
482+
}
483+
catch (Exception e) {
484+
AbstractConnectionFactory.this.logger.error("Failed to close auto-recover connection", e);
485+
}
486+
}
487+
488+
});
489+
}
470490
if (this.logger.isInfoEnabled()) {
471491
this.logger.info("Created new connection: " + connectionName + "/" + connection);
472492
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryIntegrationTests.java

Lines changed: 0 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,11 @@
2626
import static org.junit.Assert.assertTrue;
2727
import static org.junit.Assert.fail;
2828
import static org.mockito.ArgumentMatchers.anyString;
29-
import static org.mockito.BDDMockito.willReturn;
30-
import static org.mockito.Mockito.atLeastOnce;
3129
import static org.mockito.Mockito.mock;
3230
import static org.mockito.Mockito.never;
3331
import static org.mockito.Mockito.spy;
3432
import static org.mockito.Mockito.verify;
3533

36-
import java.io.IOException;
3734
import java.net.ServerSocket;
3835
import java.net.Socket;
3936
import java.util.ArrayList;
@@ -58,11 +55,9 @@
5855
import org.junit.Rule;
5956
import org.junit.Test;
6057
import org.junit.rules.ExpectedException;
61-
import org.mockito.ArgumentCaptor;
6258

6359
import org.springframework.amqp.AmqpApplicationContextClosedException;
6460
import org.springframework.amqp.AmqpAuthenticationException;
65-
import org.springframework.amqp.AmqpException;
6661
import org.springframework.amqp.AmqpIOException;
6762
import org.springframework.amqp.AmqpResourceNotAvailableException;
6863
import org.springframework.amqp.AmqpTimeoutException;
@@ -80,10 +75,6 @@
8075

8176
import com.rabbitmq.client.Channel;
8277
import com.rabbitmq.client.DefaultConsumer;
83-
import com.rabbitmq.client.Recoverable;
84-
import com.rabbitmq.client.RecoveryListener;
85-
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
86-
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
8778

8879
/**
8980
* @author Dave Syer
@@ -376,111 +367,6 @@ public void testHardErrorAndReconnectNoAuto() throws Exception {
376367
assertEquals(null, result);
377368
}
378369

379-
@Test
380-
public void testHardErrorAndReconnectAuto() throws Exception {
381-
this.connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
382-
Log cfLogger = spyOnLogger(this.connectionFactory);
383-
willReturn(true).given(cfLogger).isDebugEnabled();
384-
RabbitTemplate template = new RabbitTemplate(connectionFactory);
385-
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
386-
Queue queue = new Queue(CF_INTEGRATION_TEST_QUEUE);
387-
admin.declareQueue(queue);
388-
final String route = queue.getName();
389-
390-
final CountDownLatch latch = new CountDownLatch(1);
391-
final CountDownLatch recoveryLatch = new CountDownLatch(1);
392-
final RecoveryListener channelRecoveryListener = new RecoveryListener() {
393-
394-
@Override
395-
public void handleRecoveryStarted(Recoverable recoverable) {
396-
if (logger.isDebugEnabled()) {
397-
logger.debug("Channel recovery started: " + asString(recoverable));
398-
}
399-
}
400-
401-
@Override
402-
public void handleRecovery(Recoverable recoverable) {
403-
try {
404-
((Channel) recoverable).basicCancel("testHardErrorAndReconnect");
405-
}
406-
catch (IOException e) {
407-
}
408-
if (logger.isDebugEnabled()) {
409-
logger.debug("Channel recovery complete: " + asString(recoverable));
410-
}
411-
}
412-
413-
private String asString(Recoverable recoverable) {
414-
// TODO: https://github.com/rabbitmq/rabbitmq-java-client/issues/217
415-
return ((AutorecoveringChannel) recoverable).getDelegate().toString();
416-
}
417-
418-
};
419-
final RecoveryListener connectionRecoveryListener = new RecoveryListener() {
420-
421-
@Override
422-
public void handleRecoveryStarted(Recoverable recoverable) {
423-
if (logger.isDebugEnabled()) {
424-
logger.debug("Connection recovery started: " + recoverable);
425-
}
426-
}
427-
428-
@Override
429-
public void handleRecovery(Recoverable recoverable) {
430-
if (logger.isDebugEnabled()) {
431-
logger.debug("Connection recovery complete: " + recoverable);
432-
}
433-
recoveryLatch.countDown();
434-
}
435-
436-
};
437-
Object connection = ((ConnectionProxy) this.connectionFactory.createConnection()).getTargetConnection();
438-
connection = TestUtils.getPropertyValue(connection, "delegate");
439-
if (connection instanceof AutorecoveringConnection) {
440-
((AutorecoveringConnection) connection).addRecoveryListener(connectionRecoveryListener);
441-
}
442-
try {
443-
template.execute(channel -> {
444-
channel.getConnection().addShutdownListener(cause -> {
445-
logger.info("Error", cause);
446-
latch.countDown();
447-
// This will be thrown on the Connection thread just before it dies, so basically ignored
448-
throw new RuntimeException(cause);
449-
});
450-
Channel targetChannel = ((ChannelProxy) channel).getTargetChannel();
451-
if (targetChannel instanceof AutorecoveringChannel) {
452-
((AutorecoveringChannel) targetChannel).addRecoveryListener(channelRecoveryListener);
453-
}
454-
else {
455-
recoveryLatch.countDown(); // Spring IO Platform Tests
456-
}
457-
String tag = channel.basicConsume(route, false, "testHardErrorAndReconnect",
458-
new DefaultConsumer(channel));
459-
// Consume twice with the same tag is a hard error (connection will be reset)
460-
String result = channel.basicConsume(route, false, tag, new DefaultConsumer(channel));
461-
fail("Expected IOException, got: " + result);
462-
return null;
463-
});
464-
fail("Expected AmqpIOException");
465-
}
466-
catch (AmqpException e) {
467-
// expected
468-
}
469-
assertTrue(recoveryLatch.await(10, TimeUnit.SECONDS));
470-
if (logger.isDebugEnabled()) {
471-
logger.debug("Resuming test after recovery complete");
472-
}
473-
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
474-
verify(cfLogger, atLeastOnce()).debug(captor.capture());
475-
assertThat(captor.getValue(), containsString("Connection recovery complete:"));
476-
template.convertAndSend(route, "message");
477-
assertTrue(latch.await(10, TimeUnit.SECONDS));
478-
String result = (String) template.receiveAndConvert(route);
479-
assertEquals("message", result);
480-
result = (String) template.receiveAndConvert(route);
481-
assertEquals(null, result);
482-
}
483-
484370
@Test
485371
public void testConnectionCloseLog() {
486372
Log logger = spy(TestUtils.getPropertyValue(this.connectionFactory, "logger", Log.class));

0 commit comments

Comments
 (0)