Skip to content

Commit 07414b0

Browse files
authored
AMQP-608: Add new connection factories
JIRA https://jira.spring.io/browse/AMQP-608 Initial Commit * Doc and Javadoc polishing.
1 parent 05472a5 commit 07414b0

File tree

12 files changed

+691
-10
lines changed

12 files changed

+691
-10
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ ext {
4141
assertjVersion = '3.15.0'
4242
assertkVersion = '0.20'
4343
commonsHttpClientVersion = '4.5.10'
44+
commonsPoolVersion = '2.8.0'
4445
googleJsr305Version = '3.0.2'
4546
hamcrestVersion = '2.2'
4647
jacksonVersion = '2.10.3'
@@ -334,6 +335,7 @@ project('spring-rabbit') {
334335
exclude group: 'org.springframework'
335336
}
336337
optionalApi "com.jayway.jsonpath:json-path:$jaywayJsonPathVersion"
338+
optionalApi "org.apache.commons:commons-pool2:$commonsPoolVersion"
337339

338340
testApi project(':spring-rabbit-junit')
339341
testImplementation("com.willowtreeapps.assertk:assertk-jvm:$assertkVersion")

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/Connection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,10 @@ public interface Connection extends AutoCloseable {
8888
return null;
8989
}
9090

91+
/**
92+
* Close any channel associated with the current thread.
93+
*/
94+
default void closeThreadChannel() {
95+
}
96+
9197
}
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.connection;
18+
19+
import java.io.IOException;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.function.BiConsumer;
22+
23+
import org.aopalliance.aop.Advice;
24+
import org.aopalliance.intercept.MethodInterceptor;
25+
import org.aopalliance.intercept.MethodInvocation;
26+
import org.apache.commons.logging.Log;
27+
import org.apache.commons.pool2.ObjectPool;
28+
import org.apache.commons.pool2.PooledObject;
29+
import org.apache.commons.pool2.PooledObjectFactory;
30+
import org.apache.commons.pool2.impl.DefaultPooledObject;
31+
import org.apache.commons.pool2.impl.GenericObjectPool;
32+
33+
import org.springframework.amqp.AmqpException;
34+
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
35+
import org.springframework.aop.framework.ProxyFactory;
36+
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
37+
import org.springframework.util.Assert;
38+
39+
import com.rabbitmq.client.Channel;
40+
import com.rabbitmq.client.ConnectionFactory;
41+
42+
/**
43+
* A very simple connection factory that caches channels using Apache Pool2
44+
* {@link GenericObjectPool}s (one for transactional and one for non-transactional
45+
* channels). The pools have default configuration but they can be configured using
46+
* a callback.
47+
*
48+
* @author Gary Russell
49+
* @since 2.3
50+
*
51+
*/
52+
public class PooledChannelConnectionFactory extends AbstractConnectionFactory {
53+
54+
private volatile ConnectionWrapper connection;
55+
56+
private boolean simplePublisherConfirms;
57+
58+
private BiConsumer<GenericObjectPool<Channel>, Boolean> poolConfigurer = (pool, tx) -> { };
59+
60+
/**
61+
* Construct an instance.
62+
*
63+
* @param rabbitConnectionFactory the rabbitmq connection factory.
64+
*/
65+
public PooledChannelConnectionFactory(ConnectionFactory rabbitConnectionFactory) {
66+
this(rabbitConnectionFactory, false);
67+
}
68+
69+
/**
70+
* Construct an instance.
71+
*
72+
* @param rabbitConnectionFactory the rabbitmq connection factory.
73+
* @param isPublisher true if we are creating a publisher connection factory.
74+
*/
75+
private PooledChannelConnectionFactory(ConnectionFactory rabbitConnectionFactory, boolean isPublisher) {
76+
super(rabbitConnectionFactory);
77+
if (!isPublisher) {
78+
setPublisherConnectionFactory(new PooledChannelConnectionFactory(rabbitConnectionFactory, true));
79+
}
80+
}
81+
82+
/**
83+
* Add a consumer to configure the object pool. The second argument is true when
84+
* called with the transactional pool.
85+
* @param poolConfigurer the configurer.
86+
*/
87+
public void setPoolConfigurer(BiConsumer<GenericObjectPool<Channel>, Boolean> poolConfigurer) {
88+
Assert.notNull(poolConfigurer, "'poolConfigurer' cannot be null");
89+
this.poolConfigurer = poolConfigurer;
90+
}
91+
92+
@Override
93+
public boolean isSimplePublisherConfirms() {
94+
return this.simplePublisherConfirms;
95+
}
96+
97+
/**
98+
* Enable simple publisher confirms.
99+
* @param simplePublisherConfirms true to enable.
100+
*/
101+
public void setSimplePublisherConfirms(boolean simplePublisherConfirms) {
102+
this.simplePublisherConfirms = simplePublisherConfirms;
103+
}
104+
105+
@Override
106+
public synchronized Connection createConnection() throws AmqpException {
107+
if (this.connection == null || !this.connection.isOpen()) {
108+
Connection bareConnection = createBareConnection();
109+
this.connection = new ConnectionWrapper(bareConnection.getDelegate(), getCloseTimeout(),
110+
this.simplePublisherConfirms, this.logger, this.poolConfigurer);
111+
}
112+
return this.connection;
113+
}
114+
115+
@Override
116+
public synchronized void destroy() {
117+
super.destroy();
118+
if (this.connection != null) {
119+
this.connection.forceClose();
120+
this.connection = null;
121+
}
122+
}
123+
124+
private final static class ConnectionWrapper extends SimpleConnection {
125+
126+
private final Log logger;
127+
128+
private final ObjectPool<Channel> channels;
129+
130+
private final ObjectPool<Channel> txChannels;
131+
132+
private final boolean simplePublisherConfirms;
133+
134+
ConnectionWrapper(com.rabbitmq.client.Connection delegate, int closeTimeout, boolean simplePublisherConfirms,
135+
Log logger, BiConsumer<GenericObjectPool<Channel>, Boolean> configurer) {
136+
137+
super(delegate, closeTimeout);
138+
GenericObjectPool<Channel> pool = new GenericObjectPool<>(new ChannelFactory());
139+
configurer.accept(pool, false);
140+
this.channels = pool;
141+
pool = new GenericObjectPool<>(new TxChannelFactory());
142+
configurer.accept(pool, true);
143+
this.txChannels = pool;
144+
this.simplePublisherConfirms = simplePublisherConfirms;
145+
this.logger = logger;
146+
}
147+
148+
@Override
149+
public Channel createChannel(boolean transactional) {
150+
try {
151+
return transactional ? this.txChannels.borrowObject() : this.channels.borrowObject();
152+
}
153+
catch (Exception e) {
154+
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
155+
}
156+
}
157+
158+
private Channel createProxy(Channel channel, boolean transacted) {
159+
ProxyFactory pf = new ProxyFactory(channel);
160+
AtomicReference<Channel> proxy = new AtomicReference<>();
161+
Advice advice = new MethodInterceptor() {
162+
163+
@Override
164+
public Object invoke(MethodInvocation invocation) throws Throwable {
165+
if (transacted) {
166+
ConnectionWrapper.this.txChannels.returnObject(proxy.get());
167+
}
168+
else {
169+
ConnectionWrapper.this.channels.returnObject(proxy.get());
170+
}
171+
return null;
172+
}
173+
174+
};
175+
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice);
176+
advisor.addMethodName("close");
177+
pf.addAdvisor(advisor);
178+
proxy.set((Channel) pf.getProxy());
179+
return proxy.get();
180+
}
181+
182+
@Override
183+
public void close() {
184+
}
185+
186+
void forceClose() {
187+
super.close();
188+
this.channels.close();
189+
this.txChannels.close();
190+
}
191+
192+
private class ChannelFactory implements PooledObjectFactory<Channel> {
193+
194+
@Override
195+
public PooledObject<Channel> makeObject() throws Exception {
196+
Channel channel = ConnectionWrapper.super.createChannel(false);
197+
if (ConnectionWrapper.this.simplePublisherConfirms) {
198+
try {
199+
channel.confirmSelect();
200+
}
201+
catch (IOException e) {
202+
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
203+
}
204+
}
205+
return new DefaultPooledObject<>(createProxy(channel, false));
206+
}
207+
208+
@Override
209+
public void destroyObject(PooledObject<Channel> p) throws Exception {
210+
p.getObject().close();
211+
}
212+
213+
@Override
214+
public boolean validateObject(PooledObject<Channel> p) {
215+
return p.getObject().isOpen();
216+
}
217+
218+
@Override
219+
public void activateObject(PooledObject<Channel> p) {
220+
}
221+
222+
@Override
223+
public void passivateObject(PooledObject<Channel> p) {
224+
}
225+
226+
}
227+
228+
private final class TxChannelFactory extends ChannelFactory {
229+
230+
@Override
231+
public PooledObject<Channel> makeObject() throws Exception {
232+
Channel channel = ConnectionWrapper.super.createChannel(true);
233+
try {
234+
channel.txSelect();
235+
}
236+
catch (IOException e) {
237+
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
238+
}
239+
return new DefaultPooledObject<>(createProxy(channel, true));
240+
}
241+
242+
}
243+
244+
}
245+
246+
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitAccessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void setChannelTransacted(boolean transactional) {
5858
*
5959
* @param connectionFactory The connection factory.
6060
*/
61-
public final void setConnectionFactory(ConnectionFactory connectionFactory) {
61+
public void setConnectionFactory(ConnectionFactory connectionFactory) {
6262
this.connectionFactory = connectionFactory;
6363
}
6464

0 commit comments

Comments
 (0)