Skip to content

Commit 909ba57

Browse files
LeonardoFerreiraagaryrussell
authored andcommitted
Fix PooledChannelConnectionFactory
Dont set defaultPublisherFactory = false when calling from constructor
1 parent cdf1a9c commit 909ba57

File tree

2 files changed

+106
-3
lines changed

2 files changed

+106
-3
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,6 +50,7 @@
5050
* a callback.
5151
*
5252
* @author Gary Russell
53+
* @author Leonardo Ferreira
5354
* @since 2.3
5455
*
5556
*/
@@ -79,7 +80,7 @@ public PooledChannelConnectionFactory(ConnectionFactory rabbitConnectionFactory)
7980
private PooledChannelConnectionFactory(ConnectionFactory rabbitConnectionFactory, boolean isPublisher) {
8081
super(rabbitConnectionFactory);
8182
if (!isPublisher) {
82-
setPublisherConnectionFactory(new PooledChannelConnectionFactory(rabbitConnectionFactory, true));
83+
doSetPublisherConnectionFactory(new PooledChannelConnectionFactory(rabbitConnectionFactory, true));
8384
}
8485
else {
8586
this.defaultPublisherFactory = false;

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactoryTests.java

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

2121
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.AtomicInteger;
2223

2324
import org.junit.jupiter.api.Test;
2425

@@ -31,12 +32,14 @@
3132
import org.springframework.context.annotation.Configuration;
3233
import org.springframework.test.annotation.DirtiesContext;
3334
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
35+
import org.springframework.test.util.ReflectionTestUtils;
3436

3537
import com.rabbitmq.client.Channel;
3638
import com.rabbitmq.client.ConnectionFactory;
3739

3840
/**
3941
* @author Gary Russell
42+
* @author Leonardo Ferreira
4043
* @since 2.3
4144
*
4245
*/
@@ -97,6 +100,105 @@ void queueDeclared(@Autowired RabbitAdmin admin, @Autowired Config config,
97100
assertThat(config.closed).isTrue();
98101
}
99102

103+
@Test
104+
void copyConfigsToPublisherConnectionFactory() {
105+
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(new ConnectionFactory());
106+
AtomicInteger txConfiged = new AtomicInteger();
107+
AtomicInteger nonTxConfiged = new AtomicInteger();
108+
pcf.setPoolConfigurer((pool, tx) -> {
109+
if (tx) {
110+
txConfiged.incrementAndGet();
111+
}
112+
else {
113+
nonTxConfiged.incrementAndGet();
114+
}
115+
});
116+
117+
createAndCloseConnectionChannelTxAndChannelNonTx(pcf);
118+
119+
final org.springframework.amqp.rabbit.connection.ConnectionFactory publisherConnectionFactory = pcf
120+
.getPublisherConnectionFactory();
121+
assertThat(publisherConnectionFactory).isNotNull();
122+
123+
createAndCloseConnectionChannelTxAndChannelNonTx(publisherConnectionFactory);
124+
125+
assertThat(txConfiged.get()).isEqualTo(2);
126+
assertThat(nonTxConfiged.get()).isEqualTo(2);
127+
128+
final Object listenerPoolConfigurer = ReflectionTestUtils.getField(pcf, "poolConfigurer");
129+
final Object publisherPoolConfigurer = ReflectionTestUtils.getField(publisherConnectionFactory,
130+
"poolConfigurer");
131+
132+
assertThat(listenerPoolConfigurer)
133+
.isSameAs(publisherPoolConfigurer);
134+
135+
pcf.destroy();
136+
}
137+
138+
@Test
139+
void copyConfigsToPublisherConnectionFactoryWhenUsingCustomPublisherFactory() {
140+
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(new ConnectionFactory());
141+
AtomicBoolean listenerTxConfiged = new AtomicBoolean();
142+
AtomicBoolean listenerNonTxConfiged = new AtomicBoolean();
143+
pcf.setPoolConfigurer((pool, tx) -> {
144+
if (tx) {
145+
listenerTxConfiged.set(true);
146+
}
147+
else {
148+
listenerNonTxConfiged.set(true);
149+
}
150+
});
151+
152+
final PooledChannelConnectionFactory publisherConnectionFactory = new PooledChannelConnectionFactory(
153+
new ConnectionFactory());
154+
155+
AtomicBoolean publisherTxConfiged = new AtomicBoolean();
156+
AtomicBoolean publisherNonTxConfiged = new AtomicBoolean();
157+
publisherConnectionFactory.setPoolConfigurer((pool, tx) -> {
158+
if (tx) {
159+
publisherTxConfiged.set(true);
160+
}
161+
else {
162+
publisherNonTxConfiged.set(true);
163+
}
164+
});
165+
166+
pcf.setPublisherConnectionFactory(publisherConnectionFactory);
167+
168+
assertThat(pcf.getPublisherConnectionFactory()).isSameAs(publisherConnectionFactory);
169+
170+
createAndCloseConnectionChannelTxAndChannelNonTx(pcf);
171+
172+
assertThat(listenerTxConfiged.get()).isEqualTo(true);
173+
assertThat(listenerNonTxConfiged.get()).isEqualTo(true);
174+
175+
final Object listenerPoolConfigurer = ReflectionTestUtils.getField(pcf, "poolConfigurer");
176+
final Object publisherPoolConfigurer = ReflectionTestUtils.getField(publisherConnectionFactory,
177+
"poolConfigurer");
178+
179+
assertThat(listenerPoolConfigurer)
180+
.isNotSameAs(publisherPoolConfigurer);
181+
182+
createAndCloseConnectionChannelTxAndChannelNonTx(publisherConnectionFactory);
183+
184+
assertThat(publisherTxConfiged.get()).isEqualTo(true);
185+
assertThat(publisherNonTxConfiged.get()).isEqualTo(true);
186+
187+
pcf.destroy();
188+
}
189+
190+
private void createAndCloseConnectionChannelTxAndChannelNonTx(
191+
org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
192+
193+
Connection connection = connectionFactory.createConnection();
194+
Channel nonTxChannel = connection.createChannel(false);
195+
Channel txChannel = connection.createChannel(true);
196+
197+
RabbitUtils.closeChannel(nonTxChannel);
198+
RabbitUtils.closeChannel(txChannel);
199+
connection.close();
200+
}
201+
100202
@Configuration
101203
public static class Config {
102204

0 commit comments

Comments
 (0)