Skip to content

Commit a69920d

Browse files
garyrussellartembilan
authored andcommitted
GH-1106: Fix Use Publisher CF with RT.invoke()
Fixes #1106 `RabbitTemplate.invoke()` did not honor `usePublisherConnection`.
1 parent d53ab9b commit a69920d

File tree

3 files changed

+50
-3
lines changed

3 files changed

+50
-3
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ public static void registerDeliveryTag(ConnectionFactory connectionFactory, Chan
205205
*/
206206
public static Connection createConnection(final ConnectionFactory connectionFactory,
207207
final boolean publisherConnectionIfPossible) {
208+
208209
if (publisherConnectionIfPossible) {
209210
ConnectionFactory publisherFactory = connectionFactory.getPublisherConnectionFactory();
210211
if (publisherFactory != null) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2149,16 +2149,21 @@ public <T> T invoke(OperationsCallback<T> action, @Nullable com.rabbitmq.client.
21492149
RabbitResourceHolder resourceHolder = null;
21502150
Connection connection = null; // NOSONAR (close)
21512151
Channel channel;
2152+
ConnectionFactory connectionFactory = getConnectionFactory();
21522153
if (isChannelTransacted()) {
2153-
resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(getConnectionFactory(), true);
2154+
resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(connectionFactory, true,
2155+
this.usePublisherConnection);
21542156
channel = resourceHolder.getChannel();
21552157
if (channel == null) {
21562158
ConnectionFactoryUtils.releaseResources(resourceHolder);
21572159
throw new IllegalStateException("Resource holder returned a null channel");
21582160
}
21592161
}
21602162
else {
2161-
connection = getConnectionFactory().createConnection(); // NOSONAR - RabbitUtils
2163+
if (this.usePublisherConnection && connectionFactory.getPublisherConnectionFactory() != null) {
2164+
connectionFactory = connectionFactory.getPublisherConnectionFactory();
2165+
}
2166+
connection = connectionFactory.createConnection(); // NOSONAR - RabbitUtils
21622167
if (connection == null) {
21632168
throw new IllegalStateException("Connection factory returned a null connection");
21642169
}
@@ -2167,7 +2172,7 @@ public <T> T invoke(OperationsCallback<T> action, @Nullable com.rabbitmq.client.
21672172
if (channel == null) {
21682173
throw new IllegalStateException("Connection returned a null channel");
21692174
}
2170-
if (!getConnectionFactory().isPublisherConfirms()) {
2175+
if (!connectionFactory.isPublisherConfirms()) {
21712176
RabbitUtils.setPhysicalCloseRequired(channel, true);
21722177
}
21732178
this.dedicatedChannels.set(channel);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,47 @@ public void testAddAndRemoveAfterReceivePostProcessors() {
476476
assertThat(afterReceivePostProcessors, Matchers.contains(mpp2, mpp3));
477477
}
478478

479+
@Test
480+
public void testPublisherConnWithInvoke() {
481+
org.springframework.amqp.rabbit.connection.ConnectionFactory cf = mock(
482+
org.springframework.amqp.rabbit.connection.ConnectionFactory.class);
483+
org.springframework.amqp.rabbit.connection.ConnectionFactory pcf = mock(
484+
org.springframework.amqp.rabbit.connection.ConnectionFactory.class);
485+
given(cf.getPublisherConnectionFactory()).willReturn(pcf);
486+
RabbitTemplate template = new RabbitTemplate(cf);
487+
template.setUsePublisherConnection(true);
488+
org.springframework.amqp.rabbit.connection.Connection conn = mock(
489+
org.springframework.amqp.rabbit.connection.Connection.class);
490+
Channel channel = mock(Channel.class);
491+
given(pcf.createConnection()).willReturn(conn);
492+
given(conn.isOpen()).willReturn(true);
493+
given(conn.createChannel(false)).willReturn(channel);
494+
template.invoke(t -> null);
495+
verify(pcf).createConnection();
496+
verify(conn).createChannel(false);
497+
}
498+
499+
@Test
500+
public void testPublisherConnWithInvokeInTx() {
501+
org.springframework.amqp.rabbit.connection.ConnectionFactory cf = mock(
502+
org.springframework.amqp.rabbit.connection.ConnectionFactory.class);
503+
org.springframework.amqp.rabbit.connection.ConnectionFactory pcf = mock(
504+
org.springframework.amqp.rabbit.connection.ConnectionFactory.class);
505+
given(cf.getPublisherConnectionFactory()).willReturn(pcf);
506+
RabbitTemplate template = new RabbitTemplate(cf);
507+
template.setUsePublisherConnection(true);
508+
template.setChannelTransacted(true);
509+
org.springframework.amqp.rabbit.connection.Connection conn = mock(
510+
org.springframework.amqp.rabbit.connection.Connection.class);
511+
Channel channel = mock(Channel.class);
512+
given(pcf.createConnection()).willReturn(conn);
513+
given(conn.isOpen()).willReturn(true);
514+
given(conn.createChannel(true)).willReturn(channel);
515+
template.invoke(t -> null);
516+
verify(pcf).createConnection();
517+
verify(conn).createChannel(true);
518+
}
519+
479520
@SuppressWarnings("serial")
480521
private class TestTransactionManager extends AbstractPlatformTransactionManager {
481522

0 commit comments

Comments
 (0)