Description
Describe the bug
When we declare Rabbit Stream as a Queue
bean, we would expect that RabbitAdmin
bean would declare such an entity on the broker.
But this does not happen because there is no regular AMQP protocol interaction in the project.
The RabbitStreamTemplate
fails as:
com.rabbitmq.stream.StreamDoesNotExistException: Stream test.stream1 does not exist
at com.rabbitmq.stream.impl.ProducersCoordinator.getBrokerForProducer(ProducersCoordinator.java:112)
at com.rabbitmq.stream.impl.ProducersCoordinator.registerAgentTracker(ProducersCoordinator.java:83)
at com.rabbitmq.stream.impl.ProducersCoordinator.registerProducer(ProducersCoordinator.java:74)
at com.rabbitmq.stream.impl.StreamEnvironment.registerProducer(StreamEnvironment.java:577)
at com.rabbitmq.stream.impl.StreamProducer.<init>(StreamProducer.java:100)
at com.rabbitmq.stream.impl.StreamProducerBuilder.build(StreamProducerBuilder.java:175)
at org.springframework.rabbit.stream.producer.RabbitStreamTemplate.createOrGetProducer(RabbitStreamTemplate.java:94)
at org.springframework.rabbit.stream.producer.RabbitStreamTemplate.send(RabbitStreamTemplate.java:162)
at org.springframework.rabbit.stream.producer.RabbitStreamTemplate.convertAndSend(RabbitStreamTemplate.java:184)
at org.springframework.rabbit.stream.producer.RabbitStreamTemplate.convertAndSend(RabbitStreamTemplate.java:168)
See sample below.
To Reproduce
Comment out a RabbitListenerTests.nativeMsg()
test (or just its @Test
annotation) and the next queueOverAmqp()
will fail because the RabbitAdmin
didn't declare from the queue()
bean.
Expected behavior
Some interaction between RabbitAdmin
and com.rabbitmq.stream.Environment
to trigger .
Sample
@SpringJUnitConfig
@DirtiesContext
public class RabbitStreamTests {
private static final RabbitMQContainer RABBITMQ = new RabbitMQContainer("rabbitmq:management")
.withExposedPorts(5672, 15672, 5552)
.withEnv("RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS", "-rabbitmq_stream advertised_host localhost")
.withPluginsEnabled("rabbitmq_stream")
.withStartupTimeout(Duration.ofMinutes(2));
@Autowired
RabbitStreamTemplate rabbitStreamTemplate;
@BeforeAll
static void startContainer() {
RABBITMQ.start();
}
@Test
void testRabbitStream() {
assertThat(this.rabbitStreamTemplate.convertAndSend("test data").join()).isTrue();
}
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public ConnectionFactory rabbitConnectionFactory() {
return new CachingConnectionFactory(RABBITMQ.getAmqpPort());
}
@Bean
Environment rabbitStreamEnvironment() {
return Environment.builder()
.addressResolver(add -> new Address("localhost", RABBITMQ.getMappedPort(5552)))
.build();
}
@Bean
RabbitAdmin rabbitAdmin(ConnectionFactory rabbitConnectionFactory) {
return new RabbitAdmin(rabbitConnectionFactory);
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream1")
.stream()
.build();
}
@Bean
RabbitStreamTemplate streamTemplate(Environment env, Queue stream) {
return new RabbitStreamTemplate(env, stream.getName());
}
}
}
The workaround is like to mark RabbitAdmin
bean like this: @Bean(initMethod = "initialize")
. So, then entities are declared eagerly without waiting for AMQP connection to happen.