Skip to content

Commit 270f62d

Browse files
garyrussellartembilan
authored andcommitted
GH-1406: Fix Possible Double Ack in Consumer Batch
Resolves #1406 Previously, if an MPP caused the last record in an ack to be skipped two acks for the same deliveryTag would be sent causine a channel shutdown. There is no need to ack skipped records because the entire batch is acked (with basicAck multiple=true). **cherry-pick to 2.3.x, 2.2.x**
1 parent cd429dc commit 270f62d

File tree

2 files changed

+48
-1
lines changed

2 files changed

+48
-1
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -937,7 +937,6 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
937937
for (MessagePostProcessor processor : getAfterReceivePostProcessors()) {
938938
message = processor.postProcessMessage(message);
939939
if (message == null) {
940-
channel.basicAck(deliveryTag, false);
941940
if (this.logger.isDebugEnabled()) {
942941
this.logger.debug(
943942
"Message Post Processor returned 'null', discarding message " + original);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,18 @@
2626
import static org.mockito.ArgumentMatchers.anyLong;
2727
import static org.mockito.ArgumentMatchers.anyMap;
2828
import static org.mockito.ArgumentMatchers.anyString;
29+
import static org.mockito.ArgumentMatchers.eq;
2930
import static org.mockito.BDDMockito.given;
3031
import static org.mockito.BDDMockito.willAnswer;
3132
import static org.mockito.BDDMockito.willReturn;
3233
import static org.mockito.BDDMockito.willThrow;
3334
import static org.mockito.Mockito.atLeastOnce;
3435
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.never;
3537
import static org.mockito.Mockito.spy;
3638
import static org.mockito.Mockito.times;
3739
import static org.mockito.Mockito.verify;
40+
import static org.mockito.Mockito.verifyNoMoreInteractions;
3841

3942
import java.io.IOException;
4043
import java.net.URL;
@@ -67,6 +70,7 @@
6770
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
6871
import org.springframework.amqp.core.AcknowledgeMode;
6972
import org.springframework.amqp.core.AnonymousQueue;
73+
import org.springframework.amqp.core.BatchMessageListener;
7074
import org.springframework.amqp.core.Message;
7175
import org.springframework.amqp.core.MessageBuilder;
7276
import org.springframework.amqp.core.MessageListener;
@@ -651,6 +655,7 @@ public Message postProcessMessage(Message message) throws AmqpException {
651655
assertThat(afterReceivePostProcessors).containsExactly(mpp2, mpp3);
652656
}
653657

658+
@SuppressWarnings("unchecked")
654659
@Test
655660
void setConcurrency() throws Exception {
656661
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
@@ -676,6 +681,49 @@ void setConcurrency() throws Exception {
676681
assertThat(TestUtils.getPropertyValue(container, "consumers", Collection.class)).hasSize(10);
677682
}
678683

684+
@Test
685+
void filterMppNoDoubleAck() throws Exception {
686+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
687+
Connection connection = mock(Connection.class);
688+
Channel channel = mock(Channel.class);
689+
given(connectionFactory.createConnection()).willReturn(connection);
690+
given(connection.createChannel(false)).willReturn(channel);
691+
final AtomicReference<Consumer> consumer = new AtomicReference<>();
692+
willAnswer(invocation -> {
693+
consumer.set(invocation.getArgument(6));
694+
consumer.get().handleConsumeOk("1");
695+
return "1";
696+
}).given(channel)
697+
.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(),
698+
any(Consumer.class));
699+
final CountDownLatch latch = new CountDownLatch(1);
700+
willAnswer(invocation -> {
701+
latch.countDown();
702+
return null;
703+
}).given(channel).basicAck(anyLong(), anyBoolean());
704+
705+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
706+
container.setAfterReceivePostProcessors(msg -> null);
707+
container.setQueueNames("foo");
708+
MessageListener listener = mock(BatchMessageListener.class);
709+
container.setMessageListener(listener);
710+
container.setBatchSize(2);
711+
container.setConsumerBatchEnabled(true);
712+
container.start();
713+
BasicProperties props = new BasicProperties();
714+
byte[] payload = "baz".getBytes();
715+
Envelope envelope = new Envelope(1L, false, "foo", "bar");
716+
consumer.get().handleDelivery("1", envelope, props, payload);
717+
envelope = new Envelope(2L, false, "foo", "bar");
718+
consumer.get().handleDelivery("1", envelope, props, payload);
719+
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
720+
verify(channel, never()).basicAck(eq(1), anyBoolean());
721+
verify(channel).basicAck(2, true);
722+
container.stop();
723+
verify(listener).containerAckMode(AcknowledgeMode.AUTO);
724+
verifyNoMoreInteractions(listener);
725+
}
726+
679727
private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
680728
final boolean cancel, final CountDownLatch latch) {
681729
return invocation -> {

0 commit comments

Comments
 (0)