Skip to content

Commit 64a8f5e

Browse files
committed
spring-atticGH-325: Initial Support for RabbitMQ Stream Plugin
Resolves spring-attic#325
1 parent 57b89c5 commit 64a8f5e

File tree

12 files changed

+453
-40
lines changed

12 files changed

+453
-40
lines changed

docs/src/main/asciidoc/overview.adoc

+34
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ Default: none - the broker will generate random consumer tags.
164164
containerType::
165165
Select the type of listener container to be used.
166166
See https://docs.spring.io/spring-amqp/reference/html/_reference.html#choose-container[Choosing a Container] in the Spring AMQP documentation for more information.
167+
Also see <<rabbitmq-stream>>.
167168
+
168169
Default: `simple`
169170
deadLetterQueueName::
@@ -413,6 +414,39 @@ Not supported when the `containerType` is `direct`.
413414
+
414415
Default: `1`.
415416

417+
[[rabbitmq-stream]]
418+
=== Initial Support for the RabbitMQ Stream Plugin
419+
420+
Basic support for the https://rabbitmq.com/stream.html[RabbitMQ Stream Plugin] is now provided.
421+
422+
IMPORTANT: The consumer properties described above are not supported when you set the `containerType` property to `stream`; `concurrency` is also not supported at this time.
423+
424+
To configure the binder to use `containerType=stream`, you must add an `Environment` `@Bean` and, optionally, a customizer to customize the listener container.
425+
426+
====
427+
[source, java]
428+
----
429+
@Bean
430+
Environment streamEnv() {
431+
return Environment.builder()
432+
.build();
433+
}
434+
435+
@Bean
436+
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
437+
return (cont, dest, group) -> {
438+
StreamListenerContainer container = (StreamListenerContainer) cont;
439+
container.setConsumerCustomizer(builder -> {
440+
builder.offset(OffsetSpecification.first());
441+
});
442+
// ...
443+
};
444+
}
445+
----
446+
====
447+
448+
Refer to the https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/[RabbitMQ Stream Java Client documentation] for information about configuring the environment and consumer builder.
449+
416450
=== Advanced Listener Container Configuration
417451

418452
To set listener container properties that are not exposed as binder or binding properties, add a single bean of type `ListenerContainerCustomizer` to the application context.

spring-cloud-stream-binder-rabbit-core/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
<dependency>
2626
<groupId>org.springframework.integration</groupId>
2727
<artifactId>spring-integration-amqp</artifactId>
28+
<version>5.5.2</version>
2829
</dependency>
2930
<dependency>
3031
<groupId>org.springframework.boot</groupId>

spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/properties/RabbitConsumerProperties.java

+26-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.springframework.amqp.core.AcknowledgeMode;
2222
import org.springframework.amqp.core.MessageDeliveryMode;
23-
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ContainerType;
2423
import org.springframework.util.Assert;
2524

2625
/**
@@ -348,4 +347,30 @@ public void setReceiveTimeout(Long receiveTimeout) {
348347
this.receiveTimeout = receiveTimeout;
349348
}
350349

350+
/**
351+
* Container type.
352+
* @author Gary Russell
353+
* @since 3.2
354+
*
355+
*/
356+
public enum ContainerType {
357+
358+
/**
359+
* Container where the RabbitMQ consumer dispatches messages to an invoker thread.
360+
*/
361+
SIMPLE,
362+
363+
/**
364+
* Container where the listener is invoked directly on the RabbitMQ consumer
365+
* thread.
366+
*/
367+
DIRECT,
368+
369+
/**
370+
* Container that uses the RabbitMQ Stream Client.
371+
*/
372+
STREAM
373+
374+
}
375+
351376
}

spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties;
5252
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties.QuorumConfig;
5353
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
54+
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties.ContainerType;
5455
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
5556
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
5657
import org.springframework.cloud.stream.provisioning.ProducerDestination;
@@ -268,6 +269,9 @@ private ConsumerDestination doProvisionConsumerDestination(String name, String g
268269
}
269270
Binding binding = null;
270271
if (properties.getExtension().isBindQueue()) {
272+
if (properties.getExtension().getContainerType().equals(ContainerType.STREAM)) {
273+
queue.getArguments().put("x-queue-type", "stream");
274+
}
271275
declareQueue(queueName, queue);
272276
String[] routingKeys = bindingRoutingKeys(properties.getExtension());
273277
if (ObjectUtils.isEmpty(routingKeys)) {

spring-cloud-stream-binder-rabbit-test-support/src/main/java/org/springframework/cloud/stream/binder/test/junit/rabbit/RabbitTestSupport.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public int getPort() {
9393

9494
public void start() throws IOException {
9595
this.serverSocket = ServerSocketFactory.getDefault()
96-
.createServerSocket(this.port);
96+
.createServerSocket(this.port, 10);
9797
this.serverExec.execute(new Runnable() {
9898

9999
@Override

spring-cloud-stream-binder-rabbit/pom.xml

+28
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,41 @@
5555
<groupId>org.springframework.boot</groupId>
5656
<artifactId>spring-boot-starter-amqp</artifactId>
5757
</dependency>
58+
<dependency>
59+
<groupId>org.springframework.amqp</groupId>
60+
<artifactId>spring-amqp</artifactId>
61+
<version>2.4.0-M1</version>
62+
</dependency>
63+
<dependency>
64+
<groupId>org.springframework.amqp</groupId>
65+
<artifactId>spring-rabbit</artifactId>
66+
<version>2.4.0-M1</version>
67+
</dependency>
68+
<dependency>
69+
<groupId>org.springframework.amqp</groupId>
70+
<artifactId>spring-rabbit-stream</artifactId>
71+
<version>2.4.0-M1</version>
72+
<optional>true</optional>
73+
</dependency>
5874
<dependency>
5975
<groupId>org.springframework.integration</groupId>
6076
<artifactId>spring-integration-jmx</artifactId>
77+
<version>5.5.2</version>
78+
</dependency>
79+
<dependency>
80+
<groupId>org.springframework.integration</groupId>
81+
<artifactId>spring-integration-amqp</artifactId>
82+
<version>5.5.2</version>
83+
</dependency>
84+
<dependency>
85+
<groupId>org.springframework.integration</groupId>
86+
<artifactId>spring-integration-core</artifactId>
87+
<version>5.5.2</version>
6188
</dependency>
6289
<dependency>
6390
<groupId>org.springframework.integration</groupId>
6491
<artifactId>spring-integration-test</artifactId>
92+
<version>5.5.2</version>
6593
<scope>test</scope>
6694
</dependency>
6795
<dependency>

spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java

+46-33
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.amqp.rabbit.core.RabbitTemplate;
5353
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
5454
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
55+
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
5556
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
5657
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
5758
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
@@ -66,7 +67,6 @@
6667
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
6768
import org.springframework.beans.factory.DisposableBean;
6869
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
69-
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ContainerType;
7070
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Retry;
7171
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
7272
import org.springframework.cloud.stream.binder.BinderHeaders;
@@ -78,6 +78,7 @@
7878
import org.springframework.cloud.stream.binder.HeaderMode;
7979
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties;
8080
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
81+
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties.ContainerType;
8182
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitExtendedBindingProperties;
8283
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
8384
import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner;
@@ -186,15 +187,15 @@ public RabbitMessageChannelBinder(ConnectionFactory connectionFactory,
186187
public RabbitMessageChannelBinder(ConnectionFactory connectionFactory,
187188
RabbitProperties rabbitProperties,
188189
RabbitExchangeQueueProvisioner provisioningProvider,
189-
ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer) {
190+
ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer) {
190191

191192
this(connectionFactory, rabbitProperties, provisioningProvider, containerCustomizer, null);
192193
}
193194

194195
public RabbitMessageChannelBinder(ConnectionFactory connectionFactory,
195196
RabbitProperties rabbitProperties,
196197
RabbitExchangeQueueProvisioner provisioningProvider,
197-
ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer,
198+
ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer,
198199
MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer) {
199200

200201
super(new String[0], provisioningProvider, containerCustomizer, sourceCustomizer);
@@ -460,6 +461,47 @@ protected MessageProducer createConsumerEndpoint(
460461
"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
461462
String destination = consumerDestination.getName();
462463
RabbitConsumerProperties extension = properties.getExtension();
464+
MessageListenerContainer listenerContainer = createAndConfigureContainer(consumerDestination, group,
465+
properties, destination, extension);
466+
String[] queues = StringUtils.tokenizeToStringArray(destination, ",", true, true);
467+
listenerContainer.setQueueNames(queues);
468+
getContainerCustomizer().configure(listenerContainer,
469+
consumerDestination.getName(), group);
470+
listenerContainer.afterPropertiesSet();
471+
472+
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
473+
adapter.setBindSourceMessage(true);
474+
adapter.setBeanFactory(this.getBeanFactory());
475+
adapter.setBeanName("inbound." + destination);
476+
DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper();
477+
mapper.setRequestHeaderNames(extension.getHeaderPatterns());
478+
adapter.setHeaderMapper(mapper);
479+
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(
480+
consumerDestination, group, properties);
481+
if (properties.getMaxAttempts() > 1) {
482+
adapter.setRetryTemplate(buildRetryTemplate(properties));
483+
adapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
484+
}
485+
else {
486+
adapter.setErrorMessageStrategy(errorMessageStrategy);
487+
adapter.setErrorChannel(errorInfrastructure.getErrorChannel());
488+
}
489+
adapter.setMessageConverter(passThoughConverter);
490+
if (properties.isBatchMode() && extension.isEnableBatching()
491+
&& ContainerType.SIMPLE.equals(extension.getContainerType())) {
492+
adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS);
493+
}
494+
return adapter;
495+
}
496+
497+
private MessageListenerContainer createAndConfigureContainer(ConsumerDestination consumerDestination,
498+
String group, ExtendedConsumerProperties<RabbitConsumerProperties> properties, String destination,
499+
RabbitConsumerProperties extension) {
500+
501+
if (extension.getContainerType().equals(ContainerType.STREAM)) {
502+
return StreamContainerUtils.createContainer(consumerDestination, group, properties, destination, extension,
503+
getApplicationContext());
504+
}
463505
boolean directContainer = extension.getContainerType()
464506
.equals(ContainerType.DIRECT);
465507
AbstractMessageListenerContainer listenerContainer = directContainer
@@ -485,8 +527,6 @@ protected MessageProducer createConsumerEndpoint(
485527
.setRecoveryInterval(extension.getRecoveryInterval());
486528
listenerContainer.setTaskExecutor(
487529
new SimpleAsyncTaskExecutor(consumerDestination.getName() + "-"));
488-
String[] queues = StringUtils.tokenizeToStringArray(destination, ",", true, true);
489-
listenerContainer.setQueueNames(queues);
490530
listenerContainer.setAfterReceivePostProcessors(this.decompressingPostProcessor);
491531
listenerContainer.setMessagePropertiesConverter(
492532
RabbitMessageChannelBinder.inboundMessagePropertiesConverter);
@@ -504,40 +544,13 @@ protected MessageProducer createConsumerEndpoint(
504544
else if (getApplicationContext() != null) {
505545
listenerContainer.setApplicationEventPublisher(getApplicationContext());
506546
}
507-
getContainerCustomizer().configure(listenerContainer,
508-
consumerDestination.getName(), group);
509547
if (StringUtils.hasText(extension.getConsumerTagPrefix())) {
510548
final AtomicInteger index = new AtomicInteger();
511549
listenerContainer.setConsumerTagStrategy(
512550
q -> extension.getConsumerTagPrefix() + "#"
513551
+ index.getAndIncrement());
514552
}
515-
listenerContainer.afterPropertiesSet();
516-
517-
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(
518-
listenerContainer);
519-
adapter.setBindSourceMessage(true);
520-
adapter.setBeanFactory(this.getBeanFactory());
521-
adapter.setBeanName("inbound." + destination);
522-
DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper();
523-
mapper.setRequestHeaderNames(extension.getHeaderPatterns());
524-
adapter.setHeaderMapper(mapper);
525-
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(
526-
consumerDestination, group, properties);
527-
if (properties.getMaxAttempts() > 1) {
528-
adapter.setRetryTemplate(buildRetryTemplate(properties));
529-
adapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
530-
}
531-
else {
532-
adapter.setErrorMessageStrategy(errorMessageStrategy);
533-
adapter.setErrorChannel(errorInfrastructure.getErrorChannel());
534-
}
535-
adapter.setMessageConverter(passThoughConverter);
536-
if (properties.isBatchMode() && extension.isEnableBatching()
537-
&& ContainerType.SIMPLE.equals(extension.getContainerType())) {
538-
adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS);
539-
}
540-
return adapter;
553+
return listenerContainer;
541554
}
542555

543556
private void setSMLCProperties(

0 commit comments

Comments
 (0)