Skip to content

Commit fd74182

Browse files
committed
spring-atticGH-325: Initial Support for RabbitMQ Stream Plugin
Resolves spring-attic#325
1 parent 57b89c5 commit fd74182

File tree

12 files changed

+532
-40
lines changed

12 files changed

+532
-40
lines changed

docs/src/main/asciidoc/overview.adoc

+55
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ Default: none - the broker will generate random consumer tags.
164164
containerType::
165165
Select the type of listener container to be used.
166166
See https://docs.spring.io/spring-amqp/reference/html/_reference.html#choose-container[Choosing a Container] in the Spring AMQP documentation for more information.
167+
Also see <<rabbitmq-stream>>.
167168
+
168169
Default: `simple`
169170
deadLetterQueueName::
@@ -413,6 +414,60 @@ Not supported when the `containerType` is `direct`.
413414
+
414415
Default: `1`.
415416

417+
[[rabbitmq-stream]]
418+
=== Initial Support for the RabbitMQ Stream Plugin
419+
420+
Basic support for the https://rabbitmq.com/stream.html[RabbitMQ Stream Plugin] is now provided.
421+
422+
IMPORTANT: The consumer properties described above are not supported when you set the `containerType` property to `stream`; `concurrency` is also not supported at this time.
423+
424+
To configure the binder to use `containerType=stream`, you must add an `Environment` `@Bean` and, optionally, a customizer to customize the listener container.
425+
426+
====
427+
[source, java]
428+
----
429+
@Bean
430+
Environment streamEnv() {
431+
return Environment.builder()
432+
.build();
433+
}
434+
435+
@Bean
436+
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
437+
return (cont, dest, group) -> {
438+
StreamListenerContainer container = (StreamListenerContainer) cont;
439+
container.setConsumerCustomizer(builder -> {
440+
builder.offset(OffsetSpecification.first());
441+
});
442+
// ...
443+
};
444+
}
445+
----
446+
====
447+
448+
The stream name (for the purpose of offset tracking) is set to the binding destination.
449+
If you decide to use manual offset tracking, the `Context` is available as a message header:
450+
451+
====
452+
[source, java]
453+
----
454+
int count;
455+
456+
@Bean
457+
public Consumer<Message<?>> input() {
458+
return msg -> {
459+
System.out.println(msg);
460+
if (++count % 1000 == 0) {
461+
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
462+
context.consumer().store(context.offset());
463+
}
464+
};
465+
}
466+
----
467+
====
468+
469+
Refer to the https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/[RabbitMQ Stream Java Client documentation] for information about configuring the environment and consumer builder.
470+
416471
=== Advanced Listener Container Configuration
417472

418473
To set listener container properties that are not exposed as binder or binding properties, add a single bean of type `ListenerContainerCustomizer` to the application context.

spring-cloud-stream-binder-rabbit-core/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
<dependency>
2626
<groupId>org.springframework.integration</groupId>
2727
<artifactId>spring-integration-amqp</artifactId>
28+
<version>5.5.2</version>
2829
</dependency>
2930
<dependency>
3031
<groupId>org.springframework.boot</groupId>

spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/properties/RabbitConsumerProperties.java

+26-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.springframework.amqp.core.AcknowledgeMode;
2222
import org.springframework.amqp.core.MessageDeliveryMode;
23-
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ContainerType;
2423
import org.springframework.util.Assert;
2524

2625
/**
@@ -348,4 +347,30 @@ public void setReceiveTimeout(Long receiveTimeout) {
348347
this.receiveTimeout = receiveTimeout;
349348
}
350349

350+
/**
351+
* Container type.
352+
* @author Gary Russell
353+
* @since 3.2
354+
*
355+
*/
356+
public enum ContainerType {
357+
358+
/**
359+
* Container where the RabbitMQ consumer dispatches messages to an invoker thread.
360+
*/
361+
SIMPLE,
362+
363+
/**
364+
* Container where the listener is invoked directly on the RabbitMQ consumer
365+
* thread.
366+
*/
367+
DIRECT,
368+
369+
/**
370+
* Container that uses the RabbitMQ Stream Client.
371+
*/
372+
STREAM
373+
374+
}
375+
351376
}

spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties;
5252
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties.QuorumConfig;
5353
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
54+
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties.ContainerType;
5455
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
5556
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
5657
import org.springframework.cloud.stream.provisioning.ProducerDestination;
@@ -268,6 +269,9 @@ private ConsumerDestination doProvisionConsumerDestination(String name, String g
268269
}
269270
Binding binding = null;
270271
if (properties.getExtension().isBindQueue()) {
272+
if (properties.getExtension().getContainerType().equals(ContainerType.STREAM)) {
273+
queue.getArguments().put("x-queue-type", "stream");
274+
}
271275
declareQueue(queueName, queue);
272276
String[] routingKeys = bindingRoutingKeys(properties.getExtension());
273277
if (ObjectUtils.isEmpty(routingKeys)) {

spring-cloud-stream-binder-rabbit-test-support/src/main/java/org/springframework/cloud/stream/binder/test/junit/rabbit/RabbitTestSupport.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import javax.net.ServerSocketFactory;
2828
import javax.net.SocketFactory;
2929

30+
import org.apache.commons.logging.Log;
31+
import org.apache.commons.logging.LogFactory;
32+
3033
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
3134
import org.springframework.cloud.stream.test.junit.AbstractExternalResourceTestSupport;
3235

@@ -72,6 +75,8 @@ protected void cleanupResource() throws Exception {
7275
*/
7376
public static class RabbitProxy {
7477

78+
private static final Log LOGGER = LogFactory.getLog(RabbitProxy.class);
79+
7580
private final int port;
7681

7782
private final ExecutorService serverExec = Executors.newSingleThreadExecutor();
@@ -93,14 +98,16 @@ public int getPort() {
9398

9499
public void start() throws IOException {
95100
this.serverSocket = ServerSocketFactory.getDefault()
96-
.createServerSocket(this.port);
101+
.createServerSocket(this.port, 10);
102+
LOGGER.info("Proxy started");
97103
this.serverExec.execute(new Runnable() {
98104

99105
@Override
100106
public void run() {
101107
try {
102108
while (true) {
103109
final Socket socket = serverSocket.accept();
110+
LOGGER.info("Accepted Connection");
104111
socketExec.execute(new Runnable() {
105112

106113
@Override
@@ -113,6 +120,7 @@ public void run() {
113120

114121
@Override
115122
public void run() {
123+
LOGGER.info("Running: " + rabbitSocket.getLocalPort());
116124
try {
117125
InputStream is = rabbitSocket
118126
.getInputStream();

spring-cloud-stream-binder-rabbit/pom.xml

+28
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,41 @@
5555
<groupId>org.springframework.boot</groupId>
5656
<artifactId>spring-boot-starter-amqp</artifactId>
5757
</dependency>
58+
<dependency>
59+
<groupId>org.springframework.amqp</groupId>
60+
<artifactId>spring-amqp</artifactId>
61+
<version>2.4.0-M1</version>
62+
</dependency>
63+
<dependency>
64+
<groupId>org.springframework.amqp</groupId>
65+
<artifactId>spring-rabbit</artifactId>
66+
<version>2.4.0-M1</version>
67+
</dependency>
68+
<dependency>
69+
<groupId>org.springframework.amqp</groupId>
70+
<artifactId>spring-rabbit-stream</artifactId>
71+
<version>2.4.0-M1</version>
72+
<optional>true</optional>
73+
</dependency>
5874
<dependency>
5975
<groupId>org.springframework.integration</groupId>
6076
<artifactId>spring-integration-jmx</artifactId>
77+
<version>5.5.2</version>
78+
</dependency>
79+
<dependency>
80+
<groupId>org.springframework.integration</groupId>
81+
<artifactId>spring-integration-amqp</artifactId>
82+
<version>5.5.2</version>
83+
</dependency>
84+
<dependency>
85+
<groupId>org.springframework.integration</groupId>
86+
<artifactId>spring-integration-core</artifactId>
87+
<version>5.5.2</version>
6188
</dependency>
6289
<dependency>
6390
<groupId>org.springframework.integration</groupId>
6491
<artifactId>spring-integration-test</artifactId>
92+
<version>5.5.2</version>
6593
<scope>test</scope>
6694
</dependency>
6795
<dependency>

spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java

+49-33
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.amqp.rabbit.core.RabbitTemplate;
5353
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
5454
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
55+
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
5556
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
5657
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
5758
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
@@ -66,7 +67,6 @@
6667
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
6768
import org.springframework.beans.factory.DisposableBean;
6869
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
69-
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ContainerType;
7070
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Retry;
7171
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
7272
import org.springframework.cloud.stream.binder.BinderHeaders;
@@ -78,6 +78,7 @@
7878
import org.springframework.cloud.stream.binder.HeaderMode;
7979
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties;
8080
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
81+
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties.ContainerType;
8182
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitExtendedBindingProperties;
8283
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
8384
import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner;
@@ -186,15 +187,15 @@ public RabbitMessageChannelBinder(ConnectionFactory connectionFactory,
186187
public RabbitMessageChannelBinder(ConnectionFactory connectionFactory,
187188
RabbitProperties rabbitProperties,
188189
RabbitExchangeQueueProvisioner provisioningProvider,
189-
ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer) {
190+
ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer) {
190191

191192
this(connectionFactory, rabbitProperties, provisioningProvider, containerCustomizer, null);
192193
}
193194

194195
public RabbitMessageChannelBinder(ConnectionFactory connectionFactory,
195196
RabbitProperties rabbitProperties,
196197
RabbitExchangeQueueProvisioner provisioningProvider,
197-
ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer,
198+
ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer,
198199
MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer) {
199200

200201
super(new String[0], provisioningProvider, containerCustomizer, sourceCustomizer);
@@ -460,6 +461,50 @@ protected MessageProducer createConsumerEndpoint(
460461
"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
461462
String destination = consumerDestination.getName();
462463
RabbitConsumerProperties extension = properties.getExtension();
464+
MessageListenerContainer listenerContainer = createAndConfigureContainer(consumerDestination, group,
465+
properties, destination, extension);
466+
String[] queues = StringUtils.tokenizeToStringArray(destination, ",", true, true);
467+
listenerContainer.setQueueNames(queues);
468+
getContainerCustomizer().configure(listenerContainer,
469+
consumerDestination.getName(), group);
470+
listenerContainer.afterPropertiesSet();
471+
472+
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
473+
adapter.setBindSourceMessage(true);
474+
adapter.setBeanFactory(this.getBeanFactory());
475+
adapter.setBeanName("inbound." + destination);
476+
DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper();
477+
mapper.setRequestHeaderNames(extension.getHeaderPatterns());
478+
adapter.setHeaderMapper(mapper);
479+
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(
480+
consumerDestination, group, properties);
481+
if (properties.getMaxAttempts() > 1) {
482+
adapter.setRetryTemplate(buildRetryTemplate(properties));
483+
adapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
484+
}
485+
else {
486+
adapter.setErrorMessageStrategy(errorMessageStrategy);
487+
adapter.setErrorChannel(errorInfrastructure.getErrorChannel());
488+
}
489+
adapter.setMessageConverter(passThoughConverter);
490+
if (properties.isBatchMode() && extension.isEnableBatching()
491+
&& ContainerType.SIMPLE.equals(extension.getContainerType())) {
492+
adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS);
493+
}
494+
if (extension.getContainerType().equals(ContainerType.STREAM)) {
495+
StreamContainerUtils.configureAdapter(adapter);
496+
}
497+
return adapter;
498+
}
499+
500+
private MessageListenerContainer createAndConfigureContainer(ConsumerDestination consumerDestination,
501+
String group, ExtendedConsumerProperties<RabbitConsumerProperties> properties, String destination,
502+
RabbitConsumerProperties extension) {
503+
504+
if (extension.getContainerType().equals(ContainerType.STREAM)) {
505+
return StreamContainerUtils.createContainer(consumerDestination, group, properties, destination, extension,
506+
getApplicationContext());
507+
}
463508
boolean directContainer = extension.getContainerType()
464509
.equals(ContainerType.DIRECT);
465510
AbstractMessageListenerContainer listenerContainer = directContainer
@@ -485,8 +530,6 @@ protected MessageProducer createConsumerEndpoint(
485530
.setRecoveryInterval(extension.getRecoveryInterval());
486531
listenerContainer.setTaskExecutor(
487532
new SimpleAsyncTaskExecutor(consumerDestination.getName() + "-"));
488-
String[] queues = StringUtils.tokenizeToStringArray(destination, ",", true, true);
489-
listenerContainer.setQueueNames(queues);
490533
listenerContainer.setAfterReceivePostProcessors(this.decompressingPostProcessor);
491534
listenerContainer.setMessagePropertiesConverter(
492535
RabbitMessageChannelBinder.inboundMessagePropertiesConverter);
@@ -504,40 +547,13 @@ protected MessageProducer createConsumerEndpoint(
504547
else if (getApplicationContext() != null) {
505548
listenerContainer.setApplicationEventPublisher(getApplicationContext());
506549
}
507-
getContainerCustomizer().configure(listenerContainer,
508-
consumerDestination.getName(), group);
509550
if (StringUtils.hasText(extension.getConsumerTagPrefix())) {
510551
final AtomicInteger index = new AtomicInteger();
511552
listenerContainer.setConsumerTagStrategy(
512553
q -> extension.getConsumerTagPrefix() + "#"
513554
+ index.getAndIncrement());
514555
}
515-
listenerContainer.afterPropertiesSet();
516-
517-
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(
518-
listenerContainer);
519-
adapter.setBindSourceMessage(true);
520-
adapter.setBeanFactory(this.getBeanFactory());
521-
adapter.setBeanName("inbound." + destination);
522-
DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper();
523-
mapper.setRequestHeaderNames(extension.getHeaderPatterns());
524-
adapter.setHeaderMapper(mapper);
525-
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(
526-
consumerDestination, group, properties);
527-
if (properties.getMaxAttempts() > 1) {
528-
adapter.setRetryTemplate(buildRetryTemplate(properties));
529-
adapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
530-
}
531-
else {
532-
adapter.setErrorMessageStrategy(errorMessageStrategy);
533-
adapter.setErrorChannel(errorInfrastructure.getErrorChannel());
534-
}
535-
adapter.setMessageConverter(passThoughConverter);
536-
if (properties.isBatchMode() && extension.isEnableBatching()
537-
&& ContainerType.SIMPLE.equals(extension.getContainerType())) {
538-
adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS);
539-
}
540-
return adapter;
556+
return listenerContainer;
541557
}
542558

543559
private void setSMLCProperties(

0 commit comments

Comments
 (0)