Skip to content

Inconvertible Messages are silently discarded when using the Rabbit Binder with Consumer Side Batching #2986

Closed
@hgarus

Description

@hgarus

Describe the issue
Using the rabbitmq-Binder, with a consumer configured for consumer side batching as drescribed here, when a message is received, which is not convertible to the target type, the message is silently dropped.

To Reproduce

  1. Set up a local rabbitmq
  2. Set up a simple Spring-Boot App using the rabbit-binder
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <java.version>17</java.version>
        <spring-cloud.version>2023.0.3</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>
  1. Configure Rabbit Consumer for Batching
spring.cloud.stream.rabbit.default.consumer.auto-bind-dlq=true # not necessary for reproduction
spring.cloud.stream.bindings.listener-in-0.destination=some-destination
spring.cloud.stream.bindings.listener-in-0.group=l
spring.cloud.stream.bindings.listener-in-0.consumer.batch-mode=true
spring.cloud.stream.rabbit.bindings.listener-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.listener-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.listener-in-0.consumer.receive-timeout=100

# Useful to see what is actually happening (makes this less silent)
logging.level.org.springframework.cloud.function.context.config.JsonMessageConverter=debug
  1. Set up a consumer consuming some Dto, which is sent as JSON:
@SpringBootApplication
public class Demo1Application {

    public static void main(String[] args) {
        SpringApplication.run(Demo1Application.class, args);
    }

    public record MyDto(int i) {}
    
    @Bean
    public Consumer<List<MyDto>> listener() {
        return System.out::println;
    }
}
  1. Send a message which cannot be converted to MyDto
    @Bean
    public ApplicationRunner applicationRunner(AmqpTemplate amqpTemplate) {
        return args -> {
            // send two test messages to the consumer, first one deserializes fine, second does not
            amqpTemplate.send("some-destination", "", MessageBuilder.withBody("{\"i\": 42}".getBytes(StandardCharsets.UTF_8)).setContentType("application/json").build());
            amqpTemplate.send("some-destination", "", MessageBuilder.withBody("{\"i\": \"A STRING\"}".getBytes(StandardCharsets.UTF_8)).setContentType("application/json").build());
        };
    }
  1. Observe only one message is printed to console, the DLQ is empty.
[MyDto[i=42]]

Version of the framework
spring-boot 3.3.2 and spring cloud 2023.0.3
Expected behavior
I would expect both messages to be DLQed instead. This would match the behaviour without batching, where an inconvertible Message is DLQed.

Additional context

The Example is a bit artificial, a more realistic example would be an unexpected Enum-Value.

This might also apply to the kafka-binder, I haven't tested that.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions