Skip to content

Commit e50cb89

Browse files
artembilangaryrussell
authored andcommitted
AMQP-801-2: Introduce ConsumerDecorator
JIRA: https://jira.spring.io/browse/AMQP-801 To properly assign the queue to the `ConsumeOkEvent`, we need perform such a logic in the `Consumer.handleConsumeOk()`. * Introduce `BlockingQueueConsumer.ConsumerDecorator` to be created on each `channel.basicConsume()` for wrapping the target `InternalConsumer` per queue * Add getters to the `ConsumeOkEvent` for better interoperability * Assert assigned queue names for the `ConsumeOkEvent`s in the `SimpleMessageListenerContainerIntegration2Tests` **Cherry-pick to 1.7.x** * Add `ConsumerDecorator.consumerTag` property * Add `ConsumerDecorator.toString()` * Add JavaDocs for the `ConsumeOkEvent`
1 parent 3dba703 commit e50cb89

File tree

3 files changed

+113
-14
lines changed

3 files changed

+113
-14
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import com.rabbitmq.client.AMQP;
6363
import com.rabbitmq.client.AlreadyClosedException;
6464
import com.rabbitmq.client.Channel;
65+
import com.rabbitmq.client.Consumer;
6566
import com.rabbitmq.client.DefaultConsumer;
6667
import com.rabbitmq.client.Envelope;
6768
import com.rabbitmq.client.Recoverable;
@@ -237,10 +238,11 @@ public BlockingQueueConsumer(ConnectionFactory connectionFactory,
237238
* @param queues The queues.
238239
*/
239240
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
240-
MessagePropertiesConverter messagePropertiesConverter,
241-
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
242-
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
243-
Map<String, Object> consumerArgs, boolean exclusive, String... queues) {
241+
MessagePropertiesConverter messagePropertiesConverter,
242+
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
243+
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
244+
Map<String, Object> consumerArgs, boolean exclusive, String... queues) {
245+
244246
this(connectionFactory, messagePropertiesConverter, activeObjectCounter, acknowledgeMode, transactional,
245247
prefetchCount, defaultRequeueRejected, consumerArgs, false, exclusive, queues);
246248
}
@@ -483,7 +485,9 @@ private Message handle(Delivery delivery) throws InterruptedException {
483485
* @throws ShutdownSignalException if the connection is shut down while waiting
484486
*/
485487
public Message nextMessage() throws InterruptedException, ShutdownSignalException {
486-
logger.trace("Retrieving delivery for " + this);
488+
if (logger.isTraceEnabled()) {
489+
logger.trace("Retrieving delivery for " + this);
490+
}
487491
return handle(this.queue.take());
488492
}
489493

@@ -673,8 +677,10 @@ private void addRecoveryListener() {
673677

674678
private void consumeFromQueue(String queue) throws IOException {
675679
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
676-
(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal, this.exclusive,
677-
this.consumerArgs, this.consumer);
680+
(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
681+
this.exclusive, this.consumerArgs,
682+
new ConsumerDecorator(queue, this.consumer, this.applicationEventPublisher));
683+
678684
if (consumerTag != null) {
679685
this.consumerTags.put(consumerTag, queue);
680686
if (logger.isDebugEnabled()) {
@@ -817,7 +823,7 @@ public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
817823
*/
818824
boolean isLocallyTransacted = locallyTransacted
819825
|| (this.transactional
820-
&& TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
826+
&& TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
821827
try {
822828

823829
boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
@@ -884,11 +890,6 @@ public void handleConsumeOk(String consumerTag) {
884890
if (logger.isDebugEnabled()) {
885891
logger.debug("ConsumeOK: " + BlockingQueueConsumer.this);
886892
}
887-
if (BlockingQueueConsumer.this.applicationEventPublisher != null) {
888-
String queueName = BlockingQueueConsumer.this.consumerTags.get(consumerTag);
889-
BlockingQueueConsumer.this.applicationEventPublisher
890-
.publishEvent(new ConsumeOkEvent(this, queueName, consumerTag));
891-
}
892893
}
893894

894895
@Override
@@ -962,6 +963,68 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
962963

963964
}
964965

966+
private static final class ConsumerDecorator implements Consumer {
967+
968+
private final String queue;
969+
970+
private final Consumer delegate;
971+
972+
private final ApplicationEventPublisher applicationEventPublisher;
973+
974+
private String consumerTag;
975+
976+
ConsumerDecorator(String queue, Consumer delegate, ApplicationEventPublisher applicationEventPublisher) {
977+
this.queue = queue;
978+
this.delegate = delegate;
979+
this.applicationEventPublisher = applicationEventPublisher;
980+
}
981+
982+
983+
@Override
984+
public void handleConsumeOk(String consumerTag) {
985+
this.consumerTag = consumerTag;
986+
this.delegate.handleConsumeOk(consumerTag);
987+
if (this.applicationEventPublisher != null) {
988+
this.applicationEventPublisher.publishEvent(new ConsumeOkEvent(this.delegate, this.queue, consumerTag));
989+
}
990+
}
991+
992+
@Override
993+
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
994+
this.delegate.handleShutdownSignal(consumerTag, sig);
995+
}
996+
997+
@Override
998+
public void handleCancel(String consumerTag) throws IOException {
999+
this.delegate.handleCancel(consumerTag);
1000+
}
1001+
1002+
@Override
1003+
public void handleCancelOk(String consumerTag) {
1004+
this.delegate.handleCancelOk(consumerTag);
1005+
}
1006+
1007+
@Override
1008+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
1009+
byte[] body) throws IOException {
1010+
1011+
this.delegate.handleDelivery(consumerTag, envelope, properties, body);
1012+
}
1013+
1014+
@Override
1015+
public void handleRecoverOk(String consumerTag) {
1016+
this.delegate.handleRecoverOk(consumerTag);
1017+
}
1018+
1019+
@Override
1020+
public String toString() {
1021+
return "ConsumerDecorator{" + "queue='" + this.queue + '\'' +
1022+
", consumerTag='" + this.consumerTag + '\'' +
1023+
'}';
1024+
}
1025+
1026+
}
1027+
9651028
@SuppressWarnings("serial")
9661029
private static final class DeclarationException extends AmqpException {
9671030

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ConsumeOkEvent.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,12 @@
1919
import org.springframework.amqp.event.AmqpEvent;
2020

2121
/**
22+
* An {@link AmqpEvent} emitted by the listener container
23+
* when consumer is subscribed to the queue.
24+
*
2225
* @author Gary Russell
26+
* @author Artem Bilan
27+
*
2328
* @since 1.7.5
2429
*
2530
*/
@@ -30,12 +35,37 @@ public class ConsumeOkEvent extends AmqpEvent {
3035

3136
private final String consumerTag;
3237

38+
/**
39+
* Instantiate a {@link ConsumeOkEvent} based on the provided
40+
* consumer, queue and consumer tag.
41+
* @param source the consumer subscribed to the queue
42+
* @param queue the queue to consume
43+
* @param consumerTag the tag indicate a consumer subscription
44+
*/
3345
public ConsumeOkEvent(Object source, String queue, String consumerTag) {
3446
super(source);
3547
this.queue = queue;
3648
this.consumerTag = consumerTag;
3749
}
3850

51+
/**
52+
* Obtain the queue name a consumer has been subscribed.
53+
* @return the queue name a consumer subscribed.
54+
* @since 1.7.7
55+
*/
56+
public String getQueue() {
57+
return this.queue;
58+
}
59+
60+
/**
61+
* Obtain the consumer tag assigned to the consumer.
62+
* @return the consumer tag for subscription.
63+
* @since 1.7.7
64+
*/
65+
public String getConsumerTag() {
66+
return this.consumerTag;
67+
}
68+
3969
@Override
4070
public String toString() {
4171
return "ConsumeOkEvent [queue=" + this.queue + ", consumerTag=" + this.consumerTag

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.hamcrest.Matchers.containsString;
2121
import static org.hamcrest.Matchers.equalTo;
2222
import static org.hamcrest.Matchers.instanceOf;
23+
import static org.hamcrest.Matchers.isOneOf;
2324
import static org.junit.Assert.assertEquals;
2425
import static org.junit.Assert.assertFalse;
2526
import static org.junit.Assert.assertNull;
@@ -90,6 +91,7 @@
9091
* @author Gunnar Hillert
9192
* @author Gary Russell
9293
* @author Artem Bilan
94+
*
9395
* @since 1.3
9496
*
9597
*/
@@ -283,7 +285,11 @@ public void publishEvent(ApplicationEvent event) {
283285
assertThat(events.size(), equalTo(8));
284286
assertThat(events.get(0), instanceOf(AsyncConsumerStartedEvent.class));
285287
assertThat(events.get(1), instanceOf(ConsumeOkEvent.class));
288+
ConsumeOkEvent consumeOkEvent = (ConsumeOkEvent) events.get(1);
289+
assertThat(consumeOkEvent.getQueue(), isOneOf(this.queue.getName(), this.queue1.getName()));
286290
assertThat(events.get(2), instanceOf(ConsumeOkEvent.class));
291+
consumeOkEvent = (ConsumeOkEvent) events.get(2);
292+
assertThat(consumeOkEvent.getQueue(), isOneOf(this.queue.getName(), this.queue1.getName()));
287293
assertSame(events.get(3), eventRef.get());
288294
assertThat(events.get(4), instanceOf(AsyncConsumerRestartedEvent.class));
289295
assertThat(events.get(5), instanceOf(ConsumeOkEvent.class));

0 commit comments

Comments
 (0)