Skip to content

Commit 07443ce

Browse files
zysaaagaryrussell
authored andcommitted
GH-1401: SMLC: Fix setConcurrency
Resolves #1401 - multiple calls can temporarly inflate consumer count
1 parent 3f003cf commit 07443ce

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,6 @@ public void setConcurrency(String concurrency) {
223223
int maxConsumersToSet = Integer.parseInt(concurrency.substring(separatorIndex + 1));
224224
Assert.isTrue(maxConsumersToSet >= consumersToSet,
225225
"'maxConcurrentConsumers' value must be at least 'concurrentConsumers'");
226-
this.concurrentConsumers = 1;
227226
this.maxConcurrentConsumers = null;
228227
setConcurrentConsumers(consumersToSet);
229228
setMaxConcurrentConsumers(maxConsumersToSet);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -69,6 +69,7 @@
6969
import org.springframework.amqp.core.AnonymousQueue;
7070
import org.springframework.amqp.core.Message;
7171
import org.springframework.amqp.core.MessageBuilder;
72+
import org.springframework.amqp.core.MessageListener;
7273
import org.springframework.amqp.core.MessagePostProcessor;
7374
import org.springframework.amqp.core.Queue;
7475
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
@@ -650,6 +651,31 @@ public Message postProcessMessage(Message message) throws AmqpException {
650651
assertThat(afterReceivePostProcessors).containsExactly(mpp2, mpp3);
651652
}
652653

654+
@Test
655+
void setConcurrency() throws Exception {
656+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
657+
Connection connection = mock(Connection.class);
658+
Channel channel = mock(Channel.class);
659+
given(connectionFactory.createConnection()).willReturn(connection);
660+
given(connection.createChannel(false)).willReturn(channel);
661+
final AtomicReference<Consumer> consumer = new AtomicReference<>();
662+
willAnswer(invocation -> {
663+
consumer.set(invocation.getArgument(6));
664+
consumer.get().handleConsumeOk("1");
665+
return "1";
666+
}).given(channel)
667+
.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(),
668+
any(Consumer.class));
669+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
670+
container.setQueueNames("foo", "bar");
671+
container.setMessageListener(mock(MessageListener.class));
672+
container.setConcurrency("5-10");
673+
container.start();
674+
await().until(() -> TestUtils.getPropertyValue(container, "consumers", Collection.class).size() == 5);
675+
container.setConcurrency("10-10");
676+
assertThat(TestUtils.getPropertyValue(container, "consumers", Collection.class)).hasSize(10);
677+
}
678+
653679
private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
654680
final boolean cancel, final CountDownLatch latch) {
655681
return invocation -> {

0 commit comments

Comments
 (0)