Skip to content

Commit fee8a97

Browse files
garyrussellartembilan
authored andcommitted
GH-1026: Fix Delay with CacheMode.CONNECTION
Fixes #1026 When using a `channelCheckoutTimeout` with `CacheModeConnection`, we incorrectly spin waiting for a connection until the timeout expires. We should only wait for a connection if the limit is exceeded. **cherry-pick to all supported** (cherry picked from commit dde7a37) # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java # Conflicts: # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java
1 parent 34e614a commit fee8a97

File tree

2 files changed

+33
-9
lines changed

2 files changed

+33
-9
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -598,15 +598,17 @@ public final Connection createConnection() throws AmqpException {
598598
else if (this.cacheMode == CacheMode.CONNECTION) {
599599
ChannelCachingConnectionProxy connection = findIdleConnection();
600600
long now = System.currentTimeMillis();
601-
while (connection == null && System.currentTimeMillis() - now < this.channelCheckoutTimeout) {
602-
if (countOpenConnections() >= this.connectionLimit) {
603-
try {
604-
this.connectionMonitor.wait(this.channelCheckoutTimeout);
605-
connection = findIdleConnection();
606-
}
607-
catch (InterruptedException e) {
608-
Thread.currentThread().interrupt();
609-
throw new AmqpException("Interrupted while waiting for a connection", e);
601+
if (connection == null && countOpenConnections() >= this.connectionLimit) {
602+
while (connection == null && System.currentTimeMillis() - now < this.channelCheckoutTimeout) {
603+
if (countOpenConnections() >= this.connectionLimit) {
604+
try {
605+
this.connectionMonitor.wait(this.channelCheckoutTimeout);
606+
connection = findIdleConnection();
607+
}
608+
catch (InterruptedException e) {
609+
Thread.currentThread().interrupt();
610+
throw new AmqpException("Interrupted while waiting for a connection", e);
611+
}
610612
}
611613
}
612614
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616

1717
package org.springframework.amqp.rabbit.connection;
1818

19+
import static org.hamcrest.Matchers.lessThan;
1920
import static org.junit.Assert.assertEquals;
2021
import static org.junit.Assert.assertNotNull;
2122
import static org.junit.Assert.assertNotSame;
2223
import static org.junit.Assert.assertNull;
2324
import static org.junit.Assert.assertSame;
25+
import static org.junit.Assert.assertThat;
2426
import static org.junit.Assert.assertTrue;
2527
import static org.junit.Assert.fail;
2628
import static org.mockito.AdditionalMatchers.aryEq;
29+
import static org.mockito.BDDMockito.given;
2730
import static org.mockito.Matchers.any;
2831
import static org.mockito.Matchers.anyInt;
2932
import static org.mockito.Matchers.anyString;
@@ -1597,4 +1600,23 @@ public void testReturnsNormalCloseDeferredClose() throws Exception {
15971600
Thread.sleep(6000);
15981601
}
15991602

1603+
@Test
1604+
public void testFirstConnectionDoesntWait() throws IOException, TimeoutException {
1605+
com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
1606+
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
1607+
Channel mockChannel = mock(Channel.class);
1608+
1609+
given(mockConnectionFactory.newConnection((ExecutorService) isNull(), anyString())).willReturn(mockConnection);
1610+
given(mockConnection.createChannel()).willReturn(mockChannel);
1611+
given(mockChannel.isOpen()).willReturn(true);
1612+
given(mockConnection.isOpen()).willReturn(true);
1613+
1614+
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
1615+
ccf.setCacheMode(CacheMode.CONNECTION);
1616+
ccf.setChannelCheckoutTimeout(60000);
1617+
long t1 = System.currentTimeMillis();
1618+
ccf.createConnection();
1619+
assertThat(System.currentTimeMillis() - t1, lessThan(30_000L));
1620+
}
1621+
16001622
}

0 commit comments

Comments
 (0)