Skip to content

Commit 46151a7

Browse files
garyrussellartembilan
authored andcommitted
AMQP-804: Add BrokerEventListener
JIRA: https://jira.spring.io/browse/AMQP-804 Bind an auto-delete queue to the `amq.rabbitmq.events` exchange with a configurable list of event keys. Initial commit. * Polishing * Polishing - PR Comments * Polishing and docs; add stop/start to test; attempt rebind after stop/start * Doc polishing * Polishing - PR Comments
1 parent 5989d03 commit 46151a7

File tree

5 files changed

+447
-1
lines changed

5 files changed

+447
-1
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.listener;
18+
19+
import java.util.Map;
20+
21+
import org.springframework.amqp.core.MessageProperties;
22+
import org.springframework.amqp.event.AmqpEvent;
23+
import org.springframework.util.Assert;
24+
25+
/**
26+
* Represents a broker event generated by the Event Exchange Plugin
27+
* (https://www.rabbitmq.com/event-exchange.html).
28+
*
29+
* @author Gary Russell
30+
* @since 2.1
31+
*
32+
*/
33+
@SuppressWarnings("serial")
34+
public class BrokerEvent extends AmqpEvent {
35+
36+
private final MessageProperties properties;
37+
38+
/**
39+
* Create an instance with the provided source and properties.
40+
* @param source the source.
41+
* @param properties the properties.
42+
*/
43+
public BrokerEvent(Object source, MessageProperties properties) {
44+
super(source);
45+
Assert.notNull(properties, "MessageProperties cannot be null");
46+
this.properties = properties;
47+
}
48+
49+
/**
50+
* The event type ({@link MessageProperties#getReceivedRoutingKey()}).
51+
* @return the type.
52+
*/
53+
public String getEventType() {
54+
return this.properties.getReceivedRoutingKey();
55+
}
56+
57+
/**
58+
* Properties of the event {@link MessageProperties#getHeaders()}.
59+
* @return the properties.
60+
*/
61+
public Map<String, Object> getEventProperties() {
62+
return this.properties.getHeaders();
63+
}
64+
65+
/**
66+
* The complete {@link MessageProperties} from the event representing the event.
67+
* (The body is always blank).
68+
* @return the message properties.
69+
*/
70+
public MessageProperties getMessageProperties() {
71+
return this.properties;
72+
}
73+
74+
@Override
75+
public String toString() {
76+
return "BrokerEvent [eventType=" + this.getEventType() + ", properties=" + this.getEventProperties() + "]";
77+
}
78+
79+
}
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.listener;
18+
19+
import java.util.Arrays;
20+
21+
import org.apache.commons.logging.Log;
22+
import org.apache.commons.logging.LogFactory;
23+
24+
import org.springframework.amqp.core.AnonymousQueue;
25+
import org.springframework.amqp.core.Base64UrlNamingStrategy;
26+
import org.springframework.amqp.core.Binding;
27+
import org.springframework.amqp.core.BindingBuilder;
28+
import org.springframework.amqp.core.Message;
29+
import org.springframework.amqp.core.MessageListener;
30+
import org.springframework.amqp.core.Queue;
31+
import org.springframework.amqp.core.TopicExchange;
32+
import org.springframework.amqp.rabbit.connection.Connection;
33+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
34+
import org.springframework.amqp.rabbit.connection.ConnectionListener;
35+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
36+
import org.springframework.context.ApplicationEventPublisher;
37+
import org.springframework.context.ApplicationEventPublisherAware;
38+
import org.springframework.context.SmartLifecycle;
39+
import org.springframework.lang.Nullable;
40+
import org.springframework.util.Assert;
41+
import org.springframework.util.ObjectUtils;
42+
43+
/**
44+
* When the event-exchange-plugin is enabled (see
45+
* https://www.rabbitmq.com/event-exchange.html), if an object of this type is declared as
46+
* a bean, selected events will be published as {@link BrokerEvent}s. Such events can then
47+
* be consumed using an {@code ApplicationListener} or {@code @EventListener} method.
48+
* An {@link AnonymousQueue} will be bound to the {@code amq.rabbitmq.event} topic exchange
49+
* with the supplied keys.
50+
*
51+
* @author Gary Russell
52+
* @since 2.1
53+
*
54+
*/
55+
public class BrokerEventListener implements MessageListener, ApplicationEventPublisherAware, ConnectionListener,
56+
SmartLifecycle {
57+
58+
private static final Log logger = LogFactory.getLog(BrokerEventListener.class);
59+
60+
private final AbstractMessageListenerContainer container;
61+
62+
private final String[] eventKeys;
63+
64+
private final RabbitAdmin admin;
65+
66+
private final Queue eventQueue = new AnonymousQueue(new Base64UrlNamingStrategy("spring.events."));
67+
68+
private final boolean ownContainer;
69+
70+
private int phase;
71+
72+
private boolean autoStartup = true;
73+
74+
private boolean running;
75+
76+
private boolean stopInvoked;
77+
78+
private Exception bindingsFailedException;
79+
80+
private ApplicationEventPublisher applicationEventPublisher;
81+
82+
/**
83+
* Construct an instance using the supplied connection factory and event keys. Event
84+
* keys are patterns to match routing keys for events published to the
85+
* {@code amq.rabbitmq.event} topic exchange. They can therefore match wildcards;
86+
* examples are {@code user.#, queue.created}. Refer to the plugin documentation for
87+
* information about available events. A single-threaded
88+
* {@link DirectMessageListenerContainer} will be created; its lifecycle will be
89+
* controlled by this object's {@link SmartLifecycle} methods.
90+
* @param connectionFactory the connection factory.
91+
* @param eventKeys the event keys.
92+
*/
93+
public BrokerEventListener(ConnectionFactory connectionFactory, String... eventKeys) {
94+
this(new DirectMessageListenerContainer(connectionFactory), true, eventKeys);
95+
}
96+
97+
/**
98+
* Construct an instance using the supplied listener container factory and event keys.
99+
* Event keys are patterns to match routing keys for events published to the
100+
* {@code amq.rabbitmq.event} topic exchange. They can therefore match wildcards;
101+
* examples are {@code user.#, queue.created}. Refer to the plugin documentation for
102+
* information about available events. The container's lifecycle will be not be
103+
* controlled by this object's {@link SmartLifecycle} methods. The container should
104+
* not be configured with queues or a {@link MessageListener}; those properties will
105+
* be replaced.
106+
* @param container the listener container.
107+
* @param eventKeys the event keys.
108+
*/
109+
public BrokerEventListener(AbstractMessageListenerContainer container, String... eventKeys) {
110+
this(container, false, eventKeys);
111+
}
112+
113+
private BrokerEventListener(AbstractMessageListenerContainer container, boolean ownContainer, String... eventKeys) {
114+
Assert.notNull(container, "listener container cannot be null");
115+
Assert.isTrue(!ObjectUtils.isEmpty(eventKeys), "At least one event key is required");
116+
this.container = container;
117+
this.container.setQueues(this.eventQueue);
118+
this.container.setMessageListener(this);
119+
this.eventKeys = Arrays.copyOf(eventKeys, eventKeys.length);
120+
this.container.getConnectionFactory().addConnectionListener(this);
121+
this.admin = new RabbitAdmin(this.container.getConnectionFactory());
122+
this.ownContainer = ownContainer;
123+
}
124+
125+
@Override
126+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
127+
this.applicationEventPublisher = applicationEventPublisher;
128+
}
129+
130+
/**
131+
* Return any exception thrown when attempting to bind the queue to the event exchange.
132+
* @return the exception.
133+
*/
134+
public @Nullable Exception getBindingsFailedException() {
135+
return this.bindingsFailedException;
136+
}
137+
138+
@Override
139+
public synchronized void start() {
140+
if (!this.running) {
141+
if (this.stopInvoked) {
142+
// redeclare auto-delete queue
143+
this.stopInvoked = false;
144+
onCreate(null);
145+
}
146+
if (this.ownContainer) {
147+
this.container.start();
148+
}
149+
this.running = true;
150+
}
151+
}
152+
153+
@Override
154+
public synchronized void stop() {
155+
if (this.running) {
156+
if (this.ownContainer) {
157+
this.container.stop();
158+
}
159+
this.running = false;
160+
this.stopInvoked = true;
161+
}
162+
}
163+
164+
@Override
165+
public synchronized boolean isRunning() {
166+
return this.running;
167+
}
168+
169+
@Override
170+
public int getPhase() {
171+
return this.phase;
172+
}
173+
174+
public void setPhase(int phase) {
175+
this.phase = phase;
176+
}
177+
178+
@Override
179+
public boolean isAutoStartup() {
180+
return this.autoStartup;
181+
}
182+
183+
public void setAutoStartup(boolean autoStartup) {
184+
this.autoStartup = autoStartup;
185+
}
186+
187+
@Override
188+
public void stop(Runnable callback) {
189+
stop();
190+
callback.run();
191+
}
192+
193+
@Override
194+
public void onMessage(Message message) {
195+
if (this.applicationEventPublisher != null) {
196+
this.applicationEventPublisher.publishEvent(new BrokerEvent(this, message.getMessageProperties()));
197+
}
198+
else {
199+
if (logger.isWarnEnabled()) {
200+
logger.warn("No event publisher available for " + message + "; if the BrokerEventListener "
201+
+ "is not defined as a bean, you must provide an ApplicationEventPublisher");
202+
}
203+
}
204+
}
205+
206+
@Override
207+
public void onCreate(Connection connection) {
208+
this.bindingsFailedException = null;
209+
TopicExchange exchange = new TopicExchange("amq.rabbitmq.event");
210+
try {
211+
this.admin.declareQueue(this.eventQueue);
212+
Arrays.stream(this.eventKeys).forEach(k -> {
213+
Binding binding = BindingBuilder.bind(this.eventQueue).to(exchange).with(k);
214+
this.admin.declareBinding(binding);
215+
});
216+
}
217+
catch (Exception e) {
218+
logger.error("failed to declare event queue/bindings - is the plugin enabled?", e);
219+
this.bindingsFailedException = e;
220+
}
221+
222+
}
223+
224+
}

0 commit comments

Comments
 (0)