Description
In what version(s) of Spring AMQP are you seeing this issue?
3.0.5 and 2.4.9
Describe the bug
When you declare a queue manually using RabbitAdmin it is expected from the documentation that it will be recovered on reconnection if setRedeclareManualDeclarations(true)
is applied to the RabbitAdmin instance.
However - when using SimpleMessageListenerContainer or DirectMessageListenerContainer this can only happen if you have at least one Queue declared as a bean.
AbstractMessageListenerContainer#attemptDeclarations
private void attemptDeclarations(AmqpAdmin admin) { ApplicationContext context = this.getApplicationContext(); if (context != null) { Set<String> queueNames = getQueueNamesAsSet(); Map<String, Queue> queueBeans = context.getBeansOfType(Queue.class); for (Entry<String, Queue> entry : queueBeans.entrySet()) { Queue queue = entry.getValue(); if (isMismatchedQueuesFatal() || (queueNames.contains(queue.getName()) && admin.getQueueProperties(queue.getName()) == null)) { if (logger.isDebugEnabled()) { logger.debug("Redeclaring context exchanges, queues, bindings."); } admin.initialize(); break; } } } }
The code above will only attempt manual declarations in the AmqpAdmin implementation if there is at least one bean of Queue type.
If you have no Queue beans declared this causes anonymous queues to fail to recover if there is a reconnection event (say a member of your rabbit cluster restarts).
To Reproduce
Declare a manual AnonymousQueue against a topic exchange using a RabbitMQ Cluster.
Restart each member of the cluster.
The rebind of the topic consumer will fail as it never redeclares the anonymous queue.
This issue doesn't occur with durable quorum queues - those rebind ok.
Example test case :
- binds an anonymous topic consumer
- sends 500 messages
- uses kubectl to restart each member of a rabbitmq statefulset
- sends 500 more messages (logs lots of failed reconnection attempts and eventually times out).
`
@test
public void testRebind() {
AtomicLong received = new AtomicLong(0L);
AtomicLong sent = new AtomicLong(0L);
long max = 1000;
String exchangeName = "test-exchange";
String wildcardRoutingKey = "#";
Queue myQueue = QueueBuilder.nonDurable().autoDelete().build();
TopicExchange topicExchange = new TopicExchange(exchangeName,true,false);
Binding myBinding = BindingBuilder.bind(myQueue).to(topicExchange).with(wildcardRoutingKey);
rabbitAdmin.declareExchange(topicExchange);
rabbitAdmin.declareQueue(myQueue);
rabbitAdmin.declareBinding(myBinding);
DirectMessageListenerContainer container = new DirectMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAutoStartup(true);
container.setApplicationContext(applicationContext);
container.setAmqpAdmin(rabbitAdmin);
container.setConsumersPerQueue(1);
container.setFailedDeclarationRetryInterval(3000);
container.setPrefetchCount(1);
container.setMissingQueuesFatal(true);
container.setMismatchedQueuesFatal(true);
container.setTaskExecutor(getTaskExecutor("test-topic-consumer", 1));
container.setQueues(myQueue);
container.setDefaultRequeueRejected(true);
container.setMessageListener(message -> {
long count = received.incrementAndGet();
});
container.start();
for(int i = 0;i<(max/2);i++) {
rabbitTemplate.convertAndSend(exchangeName, "any", "testmessage-"+sent.get());
sent.incrementAndGet();
}
//Do restart
ProcessBuilder builder = new ProcessBuilder();
try {
Process proc = builder.command("kubectl", "rollout", "restart", "sts/rabbitmq-server", "-n", "verne").start();
Thread.sleep(1000);
log.error(new String(proc.getInputStream().readAllBytes()));
log.error(new String(proc.getErrorStream().readAllBytes()));
Process proc2 = builder.command("kubectl", "rollout", "status", "sts/rabbitmq-server", "-n", "verne").start();
log.error(new String(proc2.getInputStream().readAllBytes()));
log.error(new String(proc2.getErrorStream().readAllBytes()));
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Finished rabbit restart, sending new messages");
while (sent.get() < max) {
rabbitTemplate.convertAndSend(exchangeName, "any", "testmessage-"+ sent.get());
sent.incrementAndGet();
}
long timeout = 600000L;
long start = System.currentTimeMillis();
while (received.get() < max) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
//throw new RuntimeException(e);
}
assertTrue(System.currentTimeMillis() - start < timeout, "Timeout waiting for all messages - received " +received.get() + " of " + sent.get());
}
assertEquals(sent.get(), received.get());
}
`
The test will pass if you declare the anonymous queue as a bean like so.
`
@test
public void testRebindSuccess() {
AutowireCapableBeanFactory factory = applicationContext.getAutowireCapableBeanFactory();
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) factory;
AtomicLong received = new AtomicLong(0L);
AtomicLong sent = new AtomicLong(0L);
long max = 1000;
String exchangeName = "test-exchange";
String wildcardRoutingKey = "#";
BeanDefinitionBuilder b =
BeanDefinitionBuilder.rootBeanDefinition(AnonymousQueue.class);
registry.registerBeanDefinition("topicBean", b.getBeanDefinition());
Queue myQueue = factory.getBean("topicBean", AnonymousQueue.class);
TopicExchange topicExchange = new TopicExchange(exchangeName,true,false);
Binding myBinding = BindingBuilder.bind(myQueue).to(topicExchange).with(wildcardRoutingKey);
rabbitAdmin.declareExchange(topicExchange);
rabbitAdmin.declareQueue(myQueue);
rabbitAdmin.declareBinding(myBinding);
DirectMessageListenerContainer container = new DirectMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAutoStartup(true);
container.setApplicationContext(applicationContext);
container.setAmqpAdmin(rabbitAdmin);
container.setConsumersPerQueue(1);
container.setFailedDeclarationRetryInterval(3000);
container.setPrefetchCount(1);
container.setMissingQueuesFatal(true);
container.setMismatchedQueuesFatal(true);
container.setTaskExecutor(getTaskExecutor("test-topic-consumer", 1));
container.setQueues(myQueue);
container.setDefaultRequeueRejected(true);
container.setMessageListener(message -> {
long count = received.incrementAndGet();
});
container.start();
for(int i = 0;i<(max/2);i++) {
rabbitTemplate.convertAndSend(exchangeName, "any", "testmessage-"+sent.get());
sent.incrementAndGet();
}
//Do restart
ProcessBuilder builder = new ProcessBuilder();
try {
Process proc = builder.command("kubectl", "rollout", "restart", "sts/rabbitmq-server", "-n", "verne").start();
Thread.sleep(1000);
log.error(new String(proc.getInputStream().readAllBytes()));
log.error(new String(proc.getErrorStream().readAllBytes()));
Process proc2 = builder.command("kubectl", "rollout", "status", "sts/rabbitmq-server", "-n", "verne").start();
log.error(new String(proc2.getInputStream().readAllBytes()));
log.error(new String(proc2.getErrorStream().readAllBytes()));
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Finished rabbit restart, sending new messages");
while (sent.get() < max) {
rabbitTemplate.convertAndSend(exchangeName, "any", "testmessage-"+ sent.get());
sent.incrementAndGet();
}
long timeout = 600000L;
long start = System.currentTimeMillis();
while (received.get() < max) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
//throw new RuntimeException(e);
}
assertTrue(System.currentTimeMillis() - start < timeout, "Timeout waiting for all messages - received " +received.get() + " of " + sent.get());
}
assertEquals(sent.get(), received.get());
}
`
Expected behavior
AbstractMessageListenerContainer should allow manually declared queues to be redeclared. Either through checking AmqpAdmin to see if redeclareManualDeclarations is set, or by deferring the redeclaration logic to AmqpAdmin.
Sample
https://github.com/mikee/spring-rabbit-topic-rebind-issue