Skip to content

DirectMessageListenerContainer fails to recover consuming after node fail #1034

@bendyna

Description

@bendyna

Version 2.1.7.RELEASE

If node in RabbitMQ cluster fails, DirectMessageListenerContainer does not reconnect to other node. Docker containers, where I could reproduce it, attached.
docker.zip

pom.xml

    ...
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>
    </dependencies>
    ...

Application.java

@SpringBootApplication
public class Application {

    private static final Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(ConnectionFactory connectionFactory) {
        return args -> {
            String queueName = "queue";

            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.declareQueue(new Queue(queueName));

            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            for (int i = 0; i < 10000; i++) {
                rabbitTemplate.send(queueName, new Message(Integer.toString(i).getBytes(), new MessageProperties()));
            }
            logger.info("Message were sent");

            try {
                Thread.sleep(10000); // wait for queue mirroring
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }

            DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
//            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueueNames(queueName);
            container.setPrefetchCount(1);
            container.setMessageListener(message -> {
                try {
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            container.start();
        };
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("localhost:5672,localhost:5673,localhost:5674");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("test");
        return connectionFactory;
    }

}

Steps to reproduce (with docker):

  1. run docker-compose: "docker-compose -f rabbit-ha.yml up -d"
  2. wait until RabbitMQ cluster is started and Management is available on localhost:15673
  3. run spring boot application
  4. wait until consuming started
  5. usually client connects to first address - localhost:5672, so stop first node: "docker-compose -f rabbit-ha.yml stop rabbitmq1"

I tried this 10 times and in 7 times out of 10 DMLC was unable to continue consuming. Log:

2019-06-25 17:58:51.894 ERROR 4951 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
2019-06-25 17:58:51.895  INFO 4951 --- [ool-1-thread-12] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672, localhost:5673, localhost:5674]
2019-06-25 17:58:51.896  WARN 4951 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler     : An unexpected connection driver error occured (Exception message: Connection reset)
2019-06-25 17:58:51.899  INFO 4951 --- [ool-1-thread-12] o.s.a.r.c.CachingConnectionFactory       : Created new connection: connectionFactory#5ef5c734:1/SimpleConnection@17a9645f [delegate=amqp://[email protected]:5673/test, localPort= 43392]
2019-06-25 17:58:51.902 ERROR 4951 --- [ool-1-thread-12] o.s.a.r.l.DirectMessageListenerContainer : Error acking

java.lang.IllegalStateException: Channel closed; cannot ack/nack
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1131) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at com.sun.proxy.$Proxy47.basicAck(Unknown Source) ~[na:na]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleAck(DirectMessageListenerContainer.java:1063) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:997) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:956) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_212]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_212]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]

3 times out of 10 DMLC reconnected and recovered consuming. Log:

2019-06-25 17:55:39.224 ERROR 921 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
2019-06-25 17:55:39.347 ERROR 921 --- [nsumerMonitor-1] o.s.a.r.l.DirectMessageListenerContainer : Consumer canceled - channel closed SimpleConsumer [queue=queue, consumerTag=amq.ctag-PRL6yM5060BqWCnVpmMU2g identity=66a4c32a]
2019-06-25 17:55:44.347  INFO 921 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672, localhost:5673, localhost:5674]
2019-06-25 17:55:44.355  INFO 921 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: connectionFactory#2c78d320:1/SimpleConnection@1a526f2d [delegate=amqp://[email protected]:5673/test, localPort= 43252]
2019-06-25 17:55:44.370  INFO 921 --- [nsumerMonitor-1] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=queue, consumerTag=amq.ctag-_FDFe2xQt82rn5lPbaN-Gg identity=3d46acf5] started

Also tried same steps 10 times with SimpleMessageListenerContainer. All 10 times SMLC recovered consuming.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions