Skip to content

Commit 68d74bd

Browse files
garyrussellartembilan
authored andcommitted
AMQP-811: Listener container factory improvements
JIRA: https://jira.spring.io/browse/AMQP-811 - `ChannelAwareMessageListener` now inherits from `MessageListener` - enables `SimpleRabbitListenerEndpoint` to be used with either - deprecate setters for CAML - Support creating a container with no listener Polishing - PR comments; remove onMessage(message) from AAML - it was never called since CAML is always preferred by containers. * Polishing for some stale JavaDocs * Polishing for the `AsyncRabbitTemplate` to use diamonds and don't use `rawtypes` * Fix typo in the `amqp.adoc`
1 parent 67a6857 commit 68d74bd

21 files changed

+225
-120
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,9 @@ public class AsyncRabbitTemplate implements AsyncAmqpTemplate, ChannelAwareMessa
101101

102102
private final String replyAddress;
103103

104-
@SuppressWarnings("rawtypes")
105-
private final ConcurrentMap<String, RabbitFuture> pending = new ConcurrentHashMap<String, RabbitFuture>();
104+
private final ConcurrentMap<String, RabbitFuture<?>> pending = new ConcurrentHashMap<>();
106105

107-
@SuppressWarnings("rawtypes")
108-
private final CorrelationMessagePostProcessor messagePostProcessor = new CorrelationMessagePostProcessor<>();
106+
private final CorrelationMessagePostProcessor<?> messagePostProcessor = new CorrelationMessagePostProcessor<>();
109107

110108
private volatile boolean running;
111109

@@ -224,7 +222,7 @@ public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange,
224222
this.container = null;
225223
this.replyAddress = null;
226224
this.directReplyToContainer = new DirectReplyToMessageListenerContainer(this.template.getConnectionFactory());
227-
this.directReplyToContainer.setChannelAwareMessageListener(this);
225+
this.directReplyToContainer.setMessageListener(this);
228226
}
229227

230228
/**
@@ -239,7 +237,7 @@ public AsyncRabbitTemplate(RabbitTemplate template) {
239237
this.container = null;
240238
this.replyAddress = null;
241239
this.directReplyToContainer = new DirectReplyToMessageListenerContainer(this.template.getConnectionFactory());
242-
this.directReplyToContainer.setChannelAwareMessageListener(this);
240+
this.directReplyToContainer.setMessageListener(this);
243241
}
244242

245243
/**

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -353,10 +353,7 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
353353
if (this.applicationEventPublisher != null) {
354354
instance.setApplicationEventPublisher(this.applicationEventPublisher);
355355
}
356-
if (endpoint.getAutoStartup() != null) {
357-
instance.setAutoStartup(endpoint.getAutoStartup());
358-
}
359-
else if (this.autoStartup != null) {
356+
if (this.autoStartup != null) {
360357
instance.setAutoStartup(this.autoStartup);
361358
}
362359
if (this.phase != null) {
@@ -365,9 +362,14 @@ else if (this.autoStartup != null) {
365362
if (this.afterReceivePostProcessors != null) {
366363
instance.setAfterReceivePostProcessors(this.afterReceivePostProcessors);
367364
}
368-
instance.setListenerId(endpoint.getId());
365+
if (endpoint != null) {
366+
if (endpoint.getAutoStartup() != null) {
367+
instance.setAutoStartup(endpoint.getAutoStartup());
368+
}
369+
instance.setListenerId(endpoint.getId());
369370

370-
endpoint.setupListenerContainer(instance);
371+
endpoint.setupListenerContainer(instance);
372+
}
371373
if (this.beforeSendReplyPostProcessors != null
372374
&& instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {
373375
((AbstractAdaptableMessageListener) instance.getMessageListener())

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/DirectRabbitListenerContainerFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -83,7 +83,7 @@ protected void initializeContainer(DirectMessageListenerContainer instance, Rabb
8383
if (this.monitorInterval != null) {
8484
instance.setMonitorInterval(this.monitorInterval);
8585
}
86-
if (endpoint.getConcurrency() != null) {
86+
if (endpoint != null && endpoint.getConcurrency() != null) {
8787
try {
8888
instance.setConsumersPerQueue(Integer.parseInt(endpoint.getConcurrency()));
8989
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.aopalliance.aop.Advice;
2323

2424
import org.springframework.amqp.core.AcknowledgeMode;
25+
import org.springframework.amqp.core.MessageListener;
2526
import org.springframework.amqp.core.MessagePostProcessor;
2627
import org.springframework.amqp.core.Queue;
2728
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -82,7 +83,7 @@ public class ListenerContainerFactoryBean extends AbstractFactoryBean<AbstractMe
8283

8384
private Boolean exposeListenerChannel;
8485

85-
private Object messageListener;
86+
private MessageListener messageListener;
8687

8788
private ErrorHandler errorHandler;
8889

@@ -211,7 +212,7 @@ public void setExposeListenerChannel(boolean exposeListenerChannel) {
211212
this.exposeListenerChannel = exposeListenerChannel;
212213
}
213214

214-
public void setMessageListener(Object messageListener) {
215+
public void setMessageListener(MessageListener messageListener) {
215216
this.messageListener = messageListener;
216217
}
217218

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -141,11 +141,14 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb
141141
if (this.txSize != null) {
142142
instance.setTxSize(this.txSize);
143143
}
144-
String concurrency = endpoint.getConcurrency();
145-
if (concurrency != null) {
146-
instance.setConcurrency(concurrency);
144+
String concurrency = null;
145+
if (endpoint != null) {
146+
concurrency = endpoint.getConcurrency();
147+
if (concurrency != null) {
148+
instance.setConcurrency(concurrency);
149+
}
147150
}
148-
else if (this.concurrentConsumers != null) {
151+
if (concurrency == null && this.concurrentConsumers != null) {
149152
instance.setConcurrentConsumers(this.concurrentConsumers);
150153
}
151154
if ((concurrency == null || !(concurrency.contains("-"))) && this.maxConcurrentConsumers != null) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -394,23 +394,23 @@ public void setExposeListenerChannel(boolean exposeListenerChannel) {
394394
/**
395395
* Set the message listener implementation to register. This can be either a Spring
396396
* {@link MessageListener} object or a Spring {@link ChannelAwareMessageListener}
397-
* object. Using the strongly typed
398-
* {@link #setChannelAwareMessageListener(ChannelAwareMessageListener)} is preferred.
397+
* object.
399398
*
400399
* @param messageListener The listener.
401400
* @throws IllegalArgumentException if the supplied listener is not a
402401
* {@link MessageListener} or a {@link ChannelAwareMessageListener}
402+
* @deprecated use {@link #setMessageListener(MessageListener)}.
403403
* @see MessageListener
404404
* @see ChannelAwareMessageListener
405405
*/
406+
@Deprecated
406407
public void setMessageListener(Object messageListener) {
407408
checkMessageListener(messageListener);
408409
this.messageListener = messageListener;
409410
}
410411

411412
/**
412-
* Set the {@link MessageListener}; strongly typed version of
413-
* {@link #setMessageListener(Object)}.
413+
* Set the {@link MessageListener}.
414414
* @param messageListener the listener.
415415
* @since 2.0
416416
*/
@@ -419,27 +419,27 @@ public void setMessageListener(MessageListener messageListener) {
419419
}
420420

421421
/**
422-
* Set the {@link ChannelAwareMessageListener}; strongly typed version of
423-
* {@link #setMessageListener(Object)}.
422+
* Set the {@link ChannelAwareMessageListener}.
424423
* @param messageListener the listener.
425424
* @since 2.0
425+
* @deprecated use {@link #setMessageListener(MessageListener)} since
426+
* {@link ChannelAwareMessageListener} now inherits {@link MessageListener}.
426427
*/
428+
@Deprecated
427429
public void setChannelAwareMessageListener(ChannelAwareMessageListener messageListener) {
428430
this.messageListener = messageListener;
429431
}
430432

431433
/**
432434
* Check the given message listener, throwing an exception if it does not correspond to a supported listener type.
433435
* <p>
434-
* By default, only a Spring {@link MessageListener} object or a Spring
435-
* {@link ChannelAwareMessageListener} object will be accepted.
436+
* Only a Spring {@link MessageListener} object will be accepted.
436437
* @param messageListener the message listener object to check
437-
* @throws IllegalArgumentException if the supplied listener is not a MessageListener or SessionAwareMessageListener
438+
* @throws IllegalArgumentException if the supplied listener is not a MessageListener
438439
* @see MessageListener
439-
* @see ChannelAwareMessageListener
440440
*/
441441
protected void checkMessageListener(Object messageListener) {
442-
if (!(messageListener instanceof MessageListener || messageListener instanceof ChannelAwareMessageListener)) {
442+
if (!(messageListener instanceof MessageListener)) {
443443
throw new IllegalArgumentException("Message listener needs to be of type ["
444444
+ MessageListener.class.getName() + "] or [" + ChannelAwareMessageListener.class.getName() + "]");
445445
}
@@ -1071,7 +1071,7 @@ public final void afterPropertiesSet() {
10711071
}
10721072

10731073
@Override
1074-
public void setupMessageListener(Object messageListener) {
1074+
public void setupMessageListener(MessageListener messageListener) {
10751075
setMessageListener(messageListener);
10761076
}
10771077

@@ -1392,7 +1392,7 @@ protected void invokeListener(Channel channel, Message message) throws Exception
13921392
* @param channel the Rabbit Channel to operate on
13931393
* @param message the received Rabbit Message
13941394
* @throws Exception if thrown by Rabbit API methods
1395-
* @see #setMessageListener
1395+
* @see #setMessageListener(MessageListener)
13961396
*/
13971397
protected void actualInvokeListener(Channel channel, Message message) throws Exception {
13981398
Object listener = getMessageListener();

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectReplyToMessageListenerContainer.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -82,33 +82,42 @@ public final boolean removeQueueNames(String... queueNames) {
8282
}
8383

8484
@Override
85+
@SuppressWarnings("deprecation")
86+
@Deprecated
8587
public void setMessageListener(Object messageListener) {
8688
throw new UnsupportedOperationException(
8789
"'messageListener' must be a 'MessageListener' or 'ChannelAwareMessageListener'");
8890
}
8991

9092
@Override
93+
@SuppressWarnings("deprecation")
94+
@Deprecated
9195
public void setChannelAwareMessageListener(ChannelAwareMessageListener messageListener) {
92-
super.setChannelAwareMessageListener((message, channel) -> {
93-
try {
94-
messageListener.onMessage(message, channel);
95-
}
96-
finally {
97-
this.inUseConsumerChannels.remove(channel);
98-
}
99-
});
96+
setMessageListener(messageListener);
10097
}
10198

10299
@Override
103100
public void setMessageListener(MessageListener messageListener) {
104-
super.setChannelAwareMessageListener((message, channel) -> {
105-
try {
106-
messageListener.onMessage(message);
107-
}
108-
finally {
109-
this.inUseConsumerChannels.remove(channel);
110-
}
111-
});
101+
if (messageListener instanceof ChannelAwareMessageListener) {
102+
super.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
103+
try {
104+
((ChannelAwareMessageListener) messageListener).onMessage(message, channel);
105+
}
106+
finally {
107+
this.inUseConsumerChannels.remove(channel);
108+
}
109+
});
110+
}
111+
else {
112+
super.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
113+
try {
114+
messageListener.onMessage(message);
115+
}
116+
finally {
117+
this.inUseConsumerChannels.remove(channel);
118+
}
119+
});
120+
}
112121
}
113122

114123
@Override

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/MessageListenerContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.amqp.rabbit.listener;
1818

19+
import org.springframework.amqp.core.MessageListener;
1920
import org.springframework.amqp.support.converter.MessageConverter;
2021
import org.springframework.context.SmartLifecycle;
2122

@@ -33,7 +34,7 @@ public interface MessageListenerContainer extends SmartLifecycle {
3334
* if that message listener type is not supported.
3435
* @param messageListener the {@code object} to wrapped to the {@code MessageListener}.
3536
*/
36-
void setupMessageListener(Object messageListener);
37+
void setupMessageListener(MessageListener messageListener);
3738

3839
/**
3940
* @return the {@link MessageConverter} that can be used to

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/RabbitListenerContainerFactory.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,11 +16,11 @@
1616

1717
package org.springframework.amqp.rabbit.listener;
1818

19-
19+
import org.springframework.amqp.core.MessageListener;
20+
import org.springframework.lang.Nullable;
2021

2122
/**
22-
* Factory of {@link MessageListenerContainer} based on a
23-
* {@link RabbitListenerEndpoint} definition.
23+
* Factory of {@link MessageListenerContainer}s.
2424
* @param <C> the container type.
2525
* @author Stephane Nicoll
2626
* @author Gary Russell
@@ -31,10 +31,21 @@
3131
public interface RabbitListenerContainerFactory<C extends MessageListenerContainer> {
3232

3333
/**
34-
* Create a {@link MessageListenerContainer} for the given {@link RabbitListenerEndpoint}.
35-
* @param endpoint the endpoint to configure
36-
* @return the created container
34+
* Create a {@link MessageListenerContainer} for the given
35+
* {@link RabbitListenerEndpoint}.
36+
* @param endpoint the endpoint to configure.
37+
* @return the created container.
38+
*/
39+
C createListenerContainer(@Nullable RabbitListenerEndpoint endpoint);
40+
41+
/**
42+
* Create a {@link MessageListenerContainer} with no {@link MessageListener}
43+
* or queues; the listener must be added later before the container is started.
44+
* @return the created container.
45+
* @since 2.1.
3746
*/
38-
C createListenerContainer(RabbitListenerEndpoint endpoint);
47+
default C createListenerContainer() {
48+
return createListenerContainer(null);
49+
}
3950

4051
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.springframework.amqp.core.MessagePostProcessor;
3030
import org.springframework.amqp.core.MessageProperties;
3131
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
32-
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
3332
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
3433
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
3534
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
@@ -58,10 +57,9 @@
5857
*
5958
* @since 1.4
6059
*
61-
* @see MessageListener
6260
* @see ChannelAwareMessageListener
6361
*/
64-
public abstract class AbstractAdaptableMessageListener implements MessageListener, ChannelAwareMessageListener {
62+
public abstract class AbstractAdaptableMessageListener implements ChannelAwareMessageListener {
6563

6664
private static final String DEFAULT_RESPONSE_ROUTING_KEY = "";
6765

@@ -219,27 +217,6 @@ protected MessageConverter getMessageConverter() {
219217
return this.messageConverter;
220218
}
221219

222-
/**
223-
* Rabbit {@link MessageListener} entry point.
224-
* <p>
225-
* Delegates the message to the target listener method, with appropriate conversion of the message argument.
226-
* <p>
227-
* <b>Note:</b> Does not support sending response messages based on result objects returned from listener methods.
228-
* Use the {@link ChannelAwareMessageListener} entry point (typically through a Spring message listener container)
229-
* for handling result objects as well.
230-
* @param message the incoming Rabbit message
231-
* @see #onMessage(Message, com.rabbitmq.client.Channel)
232-
*/
233-
@Override
234-
public void onMessage(Message message) {
235-
try {
236-
onMessage(message, null);
237-
}
238-
catch (Exception e) {
239-
throw new ListenerExecutionFailedException("Listener threw exception", e, message);
240-
}
241-
}
242-
243220
/**
244221
* Handle the given exception that arose during listener execution.
245222
* The default implementation logs the exception at error level.

0 commit comments

Comments
 (0)