Skip to content

Commit 6bc9f79

Browse files
committed
GH-1246: SMLC: Fix addQueueNames
Resolves #1246 - called `queuesChanged()` twice - didn't wait for old consumers to exit, causing problems with exclusive consumers
1 parent 647a7fa commit 6bc9f79

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,7 @@ public void setQueueNames(String... queueName) {
413413
*/
414414
@Override
415415
public void addQueueNames(String... queueName) {
416-
super.addQueueNames(queueName);
417-
queuesChanged();
416+
super.addQueueNames(queueName); // calls addQueues() which will cycle consumers
418417
}
419418

420419
/**
@@ -812,7 +811,13 @@ private void queuesChanged() {
812811
consumerIterator.remove();
813812
count++;
814813
}
815-
this.addAndStartConsumers(count);
814+
try {
815+
this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
816+
}
817+
catch (InterruptedException e) {
818+
Thread.currentThread().interrupt();
819+
}
820+
addAndStartConsumers(count);
816821
}
817822
}
818823
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.ArgumentMatchers.any;
2121
import static org.mockito.ArgumentMatchers.anyBoolean;
22+
import static org.mockito.BDDMockito.willAnswer;
2223
import static org.mockito.Mockito.atLeastOnce;
2324
import static org.mockito.Mockito.doReturn;
2425
import static org.mockito.Mockito.spy;
@@ -132,14 +133,21 @@ public void clear() throws Exception {
132133
@Test
133134
public void testChangeQueues() throws Exception {
134135
CountDownLatch latch = new CountDownLatch(30);
135-
container =
136-
createContainer(new MessageListenerAdapter(new PojoListener(latch)), queue.getName(), queue1.getName());
136+
AtomicInteger restarts = new AtomicInteger();
137+
container = spy(createContainer(new MessageListenerAdapter(new PojoListener(latch)), false, queue.getName(),
138+
queue1.getName()));
139+
willAnswer(invocation -> {
140+
restarts.incrementAndGet();
141+
invocation.callRealMethod();
142+
return null;
143+
}).given(container).addAndStartConsumers(1);
137144
final CountDownLatch consumerLatch = new CountDownLatch(1);
138145
this.container.setApplicationEventPublisher(e -> {
139146
if (e instanceof AsyncConsumerStoppedEvent) {
140147
consumerLatch.countDown();
141148
}
142149
});
150+
this.container.start();
143151
for (int i = 0; i < 10; i++) {
144152
template.convertAndSend(queue.getName(), i + "foo");
145153
template.convertAndSend(queue1.getName(), i + "foo");
@@ -153,6 +161,7 @@ public void testChangeQueues() throws Exception {
153161
assertThat(waited).as("Timed out waiting for message").isTrue();
154162
assertThat(template.receiveAndConvert(queue.getName())).isNull();
155163
assertThat(template.receiveAndConvert(queue1.getName())).isNull();
164+
assertThat(restarts.get()).isEqualTo(1);
156165
}
157166

158167
@Test

0 commit comments

Comments
 (0)