Skip to content

Commit f14a894

Browse files
Will Drosteartembilan
authored andcommitted
AMQP-824: Name for deferredCloseExec thread pool
JIRA https://jira.spring.io/browse/AMQP-824 Taking the comments into account Fix build * Polishing for code style **Cherry-pick to 2.0.x & 2.1.x** # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java
1 parent 4af4db4 commit f14a894

File tree

3 files changed

+60
-10
lines changed

3 files changed

+60
-10
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -58,6 +58,7 @@
5858
* @author Gary Russell
5959
* @author Steve Powell
6060
* @author Artem Bilan
61+
* @author Will Droste
6162
*
6263
*/
6364
public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware,
@@ -428,7 +429,14 @@ public void setBeanName(String name) {
428429
}
429430
}
430431

431-
public boolean hasPublisherConnectionFactory() {
432+
/**
433+
* Return a bean name of the component or null if not a bean.
434+
* @return the bean name or null.
435+
* @since 1.7.9
436+
*/
437+
protected String getBeanName() {
438+
return this.beanName;
439+
} public boolean hasPublisherConnectionFactory() {
432440
return this.publisherConnectionFactory != null;
433441
}
434442

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.Executors;
3737
import java.util.concurrent.LinkedBlockingDeque;
3838
import java.util.concurrent.Semaphore;
39+
import java.util.concurrent.ThreadFactory;
3940
import java.util.concurrent.TimeUnit;
4041
import java.util.concurrent.TimeoutException;
4142
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,6 +53,7 @@
5253
import org.springframework.beans.factory.InitializingBean;
5354
import org.springframework.jmx.export.annotation.ManagedAttribute;
5455
import org.springframework.jmx.export.annotation.ManagedResource;
56+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
5557
import org.springframework.util.Assert;
5658
import org.springframework.util.ObjectUtils;
5759
import org.springframework.util.StringUtils;
@@ -89,13 +91,21 @@
8991
* @author Gary Russell
9092
* @author Artem Bilan
9193
* @author Steve Powell
94+
* @author Will Droste
9295
*/
9396
@ManagedResource
9497
public class CachingConnectionFactory extends AbstractConnectionFactory
9598
implements InitializingBean, ShutdownListener, PublisherCallbackChannelConnectionFactory {
9699

97100
private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;
98101

102+
private static final String DEFAULT_DEFERRED_POOL_PREFIX = "spring-rabbit-deferred-pool-";
103+
104+
/**
105+
* Create a unique ID for the pool.
106+
*/
107+
private static final AtomicInteger threadPoolId = new AtomicInteger();
108+
99109
private static final Set<String> txStarts = new HashSet<>(Arrays.asList("basicPublish", "basicAck",
100110
"basicNack", "basicReject"));
101111

@@ -169,6 +179,10 @@ public enum CacheMode {
169179

170180
/** Executor used for deferred close if no explicit executor set. */
171181
private final ExecutorService deferredCloseExecutor = Executors.newCachedThreadPool();
182+
/**
183+
* Executor used for deferred close if no explicit executor set.
184+
*/
185+
private ExecutorService deferredCloseExecutor;
172186

173187

174188
/**
@@ -734,7 +748,9 @@ public final void destroy() {
734748
resetConnection();
735749
if (getContextStopped()) {
736750
this.stopped = true;
737-
this.deferredCloseExecutor.shutdownNow();
751+
if (this.deferredCloseExecutor != null) {
752+
this.deferredCloseExecutor.shutdownNow();
753+
}
738754
}
739755
}
740756

@@ -878,6 +894,28 @@ private int countOpenConnections() {
878894
return n;
879895
}
880896

897+
/**
898+
* Determine the executor service used to close connections.
899+
* @return specified executor service otherwise the default one is created and returned.
900+
* @since 1.7.9
901+
*/
902+
protected ExecutorService getDeferredCloseExecutor() {
903+
if (getExecutorService() != null) {
904+
return getExecutorService();
905+
}
906+
synchronized (this.connectionMonitor) {
907+
if (this.deferredCloseExecutor == null) {
908+
final String threadPrefix =
909+
getBeanName() == null
910+
? DEFAULT_DEFERRED_POOL_PREFIX + threadPoolId.incrementAndGet()
911+
: getBeanName();
912+
ThreadFactory threadPoolFactory = new CustomizableThreadFactory(threadPrefix);
913+
this.deferredCloseExecutor = Executors.newCachedThreadPool(threadPoolFactory);
914+
}
915+
}
916+
return this.deferredCloseExecutor;
917+
}
918+
881919
@Override
882920
public String toString() {
883921
return "CachingConnectionFactory [channelCacheSize=" + this.channelCacheSize + ", host=" + getHost()
@@ -1119,9 +1157,7 @@ private void physicalClose() throws Exception {
11191157
}
11201158

11211159
private void asyncClose() {
1122-
ExecutorService executorService = (getExecutorService() != null
1123-
? getExecutorService()
1124-
: CachingConnectionFactory.this.deferredCloseExecutor);
1160+
ExecutorService executorService = getDeferredCloseExecutor();
11251161
final Channel channel = CachedChannelInvocationHandler.this.target;
11261162
executorService.execute(() -> {
11271163
try {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerPlaceholderParserTests.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2016 the original author or authors.
2+
* Copyright 2010-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import static org.junit.Assert.assertTrue;
2121

2222
import java.util.Arrays;
23+
import java.util.concurrent.ExecutorService;
2324
import java.util.concurrent.ThreadPoolExecutor;
2425

2526
import org.junit.After;
@@ -42,6 +43,7 @@
4243
/**
4344
* @author Dave Syer
4445
* @author Gary Russell
46+
* @author Will Droste
4547
*/
4648
public final class ListenerContainerPlaceholderParserTests {
4749

@@ -58,14 +60,18 @@ public void closeBeanFactory() throws Exception {
5860
if (this.context != null) {
5961
CachingConnectionFactory cf = this.context.getBean(CachingConnectionFactory.class);
6062
this.context.close();
61-
assertTrue(TestUtils.getPropertyValue(cf, "deferredCloseExecutor", ThreadPoolExecutor.class)
62-
.isTerminated());
63+
ExecutorService es = TestUtils.getPropertyValue(cf, "deferredCloseExecutor", ThreadPoolExecutor.class);
64+
if (es != null) {
65+
// if it gets started make sure its terminated..
66+
assertTrue(es.isTerminated());
67+
}
6368
}
6469
}
6570

6671
@Test
6772
public void testParseWithQueueNames() throws Exception {
68-
SimpleMessageListenerContainer container = this.context.getBean("testListener", SimpleMessageListenerContainer.class);
73+
SimpleMessageListenerContainer container =
74+
this.context.getBean("testListener", SimpleMessageListenerContainer.class);
6975
assertEquals(AcknowledgeMode.MANUAL, container.getAcknowledgeMode());
7076
assertEquals(this.context.getBean(ConnectionFactory.class), container.getConnectionFactory());
7177
assertEquals(MessageListenerAdapter.class, container.getMessageListener().getClass());

0 commit comments

Comments
 (0)