Skip to content
This repository was archived by the owner on Nov 20, 2024. It is now read-only.

Commit d95a1c1

Browse files
authored
GH-325: RabbitMQ Stream to Latest Snapshot
* Fix docs; set container bean name; fix stream name.
1 parent e60db26 commit d95a1c1

File tree

5 files changed

+34
-16
lines changed

5 files changed

+34
-16
lines changed

docs/src/main/asciidoc/overview.adoc

+5-2
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ Environment streamEnv() {
438438
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
439439
return (cont, dest, group) -> {
440440
StreamListenerContainer container = (StreamListenerContainer) cont;
441-
container.setConsumerCustomizer(builder -> {
441+
container.setConsumerCustomizer((name, builder) -> {
442442
builder.offset(OffsetSpecification.first());
443443
});
444444
// ...
@@ -447,7 +447,10 @@ ListenerContainerCustomizer<MessageListenerContainer> customizer() {
447447
----
448448
====
449449

450-
The stream name (for the purpose of offset tracking) is set to the binding `destination + '.' + group`.
450+
The `name` argument passed to the customizer is `destination + '.' + group + '.container'`.
451+
452+
The stream `name()` (for the purpose of offset tracking) is set to the binding `destination + '.' + group`.
453+
It can be changed using a `ConsumerCustomizer` shown above.
451454
If you decide to use manual offset tracking, the `Context` is available as a message header:
452455

453456
====

spring-cloud-stream-binder-rabbit/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
<dependency>
5959
<groupId>org.springframework.amqp</groupId>
6060
<artifactId>spring-rabbit-stream</artifactId>
61-
<version>2.4.0-M1</version>
61+
<version>2.4.0-SNAPSHOT</version>
6262
<optional>true</optional>
6363
</dependency>
6464
<dependency>

spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java

+1
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ private MessageListenerContainer createAndConfigureContainer(ConsumerDestination
511511
AbstractMessageListenerContainer listenerContainer = directContainer
512512
? new DirectMessageListenerContainer(this.connectionFactory)
513513
: new SimpleMessageListenerContainer(this.connectionFactory);
514+
listenerContainer.setBeanName(consumerDestination.getName() + "." + group + ".container");
514515
listenerContainer
515516
.setAcknowledgeMode(extension.getAcknowledgeMode());
516517
listenerContainer.setChannelTransacted(extension.isTransacted());

spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamContainerUtils.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@
2020
import java.nio.charset.StandardCharsets;
2121
import java.util.Map;
2222
import java.util.UUID;
23-
import java.util.function.Consumer;
2423
import java.util.function.Supplier;
2524

2625
import com.rabbitmq.stream.Codec;
27-
import com.rabbitmq.stream.ConsumerBuilder;
2826
import com.rabbitmq.stream.Environment;
2927
import com.rabbitmq.stream.MessageBuilder;
3028
import com.rabbitmq.stream.MessageBuilder.ApplicationPropertiesBuilder;
@@ -46,6 +44,7 @@
4644
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
4745
import org.springframework.lang.Nullable;
4846
import org.springframework.messaging.MessageHeaders;
47+
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
4948
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
5049
import org.springframework.rabbit.stream.support.StreamMessageProperties;
5150
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
@@ -82,15 +81,16 @@ public static MessageListenerContainer createContainer(ConsumerDestination consu
8281
StreamListenerContainer container = new StreamListenerContainer(applicationContext.getBean(Environment.class)) {
8382

8483
@Override
85-
public synchronized void setConsumerCustomizer(Consumer<ConsumerBuilder> consumerCustomizer) {
86-
super.setConsumerCustomizer(builder -> {
87-
builder.name(consumerDestination.getName());
88-
consumerCustomizer.accept(builder);
84+
public synchronized void setConsumerCustomizer(ConsumerCustomizer consumerCustomizer) {
85+
super.setConsumerCustomizer((id, builder) -> {
86+
builder.name(consumerDestination.getName() + "." + group);
87+
consumerCustomizer.accept(id, builder);
8988
});
9089
}
9190

9291

9392
};
93+
container.setBeanName(consumerDestination.getName() + "." + group + ".container");
9494
container.setMessageConverter(new DefaultStreamMessageConverter());
9595
return container;
9696
}

spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/stream/RabbitStreamBinderModuleTests.java

+21-7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.rabbitmq.stream.ConsumerBuilder;
2020
import com.rabbitmq.stream.Environment;
21+
import com.rabbitmq.stream.OffsetSpecification;
2122
import org.junit.jupiter.api.AfterEach;
2223
import org.junit.jupiter.api.Test;
2324

@@ -42,6 +43,7 @@
4243
import static org.assertj.core.api.Assertions.assertThat;
4344
import static org.mockito.BDDMockito.given;
4445
import static org.mockito.Mockito.mock;
46+
import static org.mockito.Mockito.verify;
4547

4648
/**
4749
* @author Gary Russell
@@ -59,7 +61,7 @@ public void tearDown() {
5961
}
6062

6163
@Test
62-
public void testExtendedProperties() {
64+
public void testStreamContainer() {
6365
context = new SpringApplicationBuilder(SimpleProcessor.class)
6466
.web(WebApplicationType.NONE)
6567
.run("--server.port=0");
@@ -72,26 +74,38 @@ public void testExtendedProperties() {
7274
new ExtendedConsumerProperties<RabbitConsumerProperties>(rProps);
7375
props.setAutoStartup(false);
7476
Binding<MessageChannel> binding = rabbitBinder.bindConsumer("testStream", "grp", new QueueChannel(), props);
75-
assertThat(TestUtils.getPropertyValue(binding, "lifecycle.messageListenerContainer"))
76-
.isInstanceOf(StreamListenerContainer.class);
77+
Object container = TestUtils.getPropertyValue(binding, "lifecycle.messageListenerContainer");
78+
assertThat(container).isInstanceOf(StreamListenerContainer.class);
79+
((StreamListenerContainer) container).start();
80+
verify(this.context.getBean(ConsumerBuilder.class)).offset(OffsetSpecification.first());
81+
((StreamListenerContainer) container).stop();
7782
}
7883

7984
@SpringBootApplication
8085
public static class SimpleProcessor {
8186

8287
@Bean
8388
public ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer() {
84-
return (c, q, g) -> ((StreamListenerContainer) c).setBeanName(
85-
"setByCustomizerForQueue:" + q + (g == null ? "" : ",andGroup:" + g));
89+
return (cont, dest, group) -> {
90+
StreamListenerContainer container = (StreamListenerContainer) cont;
91+
container.setConsumerCustomizer((name, builder) -> {
92+
builder.offset(OffsetSpecification.first());
93+
});
94+
};
8695
}
8796

8897
@Bean
89-
Environment env() {
98+
Environment env(ConsumerBuilder builder) {
9099
Environment env = mock(Environment.class);
91-
given(env.consumerBuilder()).willReturn(mock(ConsumerBuilder.class));
100+
given(env.consumerBuilder()).willReturn(builder);
92101
return env;
93102
}
94103

104+
@Bean
105+
ConsumerBuilder consumerBuilder() {
106+
return mock(ConsumerBuilder.class);
107+
}
108+
95109
}
96110

97111
}

0 commit comments

Comments
 (0)