Skip to content

SimpleMessageListenerContainer: repeat ack when batch listen enabled #1406

Closed
@zysaaa

Description

@zysaaa

if (this.consumerBatchEnabled) {
Collection<MessagePostProcessor> afterReceivePostProcessors = getAfterReceivePostProcessors();
if (afterReceivePostProcessors != null) {
Message original = message;
deliveryTag = message.getMessageProperties().getDeliveryTag();
for (MessagePostProcessor processor : getAfterReceivePostProcessors()) {
message = processor.postProcessMessage(message);
if (message == null) {
channel.basicAck(deliveryTag, false);
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Message Post Processor returned 'null', discarding message " + original);
}
break;
}
}
}

When message is null after MessagePostProcessor#process, original message will be acked directly. But subsequent consumer.commitIfNecessary(isChannelLocallyTransacted(), getMessageAckListener()) method will also perform an multiple ack, may resulting in repeat ack and raise some exception:

DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received shutdown signal for consumer tag=amq.ctag-X4tiGLDLmy4f0BH8pJfINg
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, method-id=80)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)

It doesn't seem necessary to do an ACK here, consumer.commitIfNecessary will do a multiple ack anyway.

Test code:

        SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer(connectionFactory);
        smlc.addQueueNames("testq");
        smlc.setAfterReceivePostProcessors(new MessagePostProcessor() {
            int index = 0;
            @Override
            public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
                if (index++ >= 1) {
                    return null;
                } else {
                    return message;
                }
            }
        });
        smlc.setBatchSize(10);
        smlc.setConsumerBatchEnabled(true);
        smlc.setMessageListener(new BatchMessageListener() {
            @Override
            public void onMessageBatch(List<Message> messages) {
                System.out.println("I got: " + messages.size());
            }
        });

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions