Skip to content

Commit 9e45320

Browse files
garyrussellartembilan
authored andcommitted
GH-1246: SMLC: Fix addQueueNames
Resolves #1246 - called `queuesChanged()` twice - didn't wait for old consumers to exit, causing problems with exclusive consumers
1 parent 637c1ed commit 9e45320

File tree

2 files changed

+19
-5
lines changed

2 files changed

+19
-5
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,7 @@ public void setQueueNames(String... queueName) {
399399
*/
400400
@Override
401401
public void addQueueNames(String... queueName) {
402-
super.addQueueNames(queueName);
403-
queuesChanged();
402+
super.addQueueNames(queueName); // calls addQueues() which will cycle consumers
404403
}
405404

406405
/**
@@ -801,7 +800,13 @@ private void queuesChanged() {
801800
consumerIterator.remove();
802801
count++;
803802
}
804-
this.addAndStartConsumers(count);
803+
try {
804+
this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
805+
}
806+
catch (InterruptedException e) {
807+
Thread.currentThread().interrupt();
808+
}
809+
addAndStartConsumers(count);
805810
}
806811
}
807812
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.mockito.ArgumentMatchers.any;
2323
import static org.mockito.ArgumentMatchers.anyBoolean;
2424
import static org.mockito.BDDMockito.given;
25+
import static org.mockito.BDDMockito.willAnswer;
2526
import static org.mockito.BDDMockito.willReturn;
2627
import static org.mockito.Mockito.atLeastOnce;
2728
import static org.mockito.Mockito.spy;
@@ -135,14 +136,21 @@ public void clear() throws Exception {
135136
@Test
136137
public void testChangeQueues() throws Exception {
137138
CountDownLatch latch = new CountDownLatch(30);
138-
container =
139-
createContainer(new MessageListenerAdapter(new PojoListener(latch)), queue.getName(), queue1.getName());
139+
AtomicInteger restarts = new AtomicInteger();
140+
container = spy(createContainer(new MessageListenerAdapter(new PojoListener(latch)), false, queue.getName(),
141+
queue1.getName()));
142+
willAnswer(invocation -> {
143+
restarts.incrementAndGet();
144+
invocation.callRealMethod();
145+
return null;
146+
}).given(container).addAndStartConsumers(1);
140147
final CountDownLatch consumerLatch = new CountDownLatch(1);
141148
this.container.setApplicationEventPublisher(e -> {
142149
if (e instanceof AsyncConsumerStoppedEvent) {
143150
consumerLatch.countDown();
144151
}
145152
});
153+
this.container.start();
146154
for (int i = 0; i < 10; i++) {
147155
template.convertAndSend(queue.getName(), i + "foo");
148156
template.convertAndSend(queue1.getName(), i + "foo");
@@ -156,6 +164,7 @@ public void testChangeQueues() throws Exception {
156164
assertThat(waited).as("Timed out waiting for message").isTrue();
157165
assertThat(template.receiveAndConvert(queue.getName())).isNull();
158166
assertThat(template.receiveAndConvert(queue1.getName())).isNull();
167+
assertThat(restarts.get()).isEqualTo(1);
159168
}
160169

161170
@Test

0 commit comments

Comments
 (0)