Skip to content

Commit acb5683

Browse files
garyrussellartembilan
authored andcommitted
GH-1067: Add enum for publisher confirm type
Resolves #1067 * * Refactor deprecated setters to use new setter * * Fix this.publisherConnectionFactory typo
1 parent bbc4eba commit acb5683

File tree

14 files changed

+130
-55
lines changed

14 files changed

+130
-55
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
111111
NamespaceUtils.setValueIfAttributeDefined(builder, element, FACTORY_TIMEOUT, "channelCheckoutTimeout");
112112
NamespaceUtils.setValueIfAttributeDefined(builder, element, CONNECTION_LIMIT);
113113
NamespaceUtils.setReferenceIfAttributeDefined(builder, element, "connection-name-strategy");
114+
NamespaceUtils.setValueIfAttributeDefined(builder, element, "confirm-type", "publisherConfirmType");
114115
}
115116

116117
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,41 @@ public class CachingConnectionFactory extends AbstractConnectionFactory
129129
* The cache mode.
130130
*/
131131
public enum CacheMode {
132+
132133
/**
133134
* Cache channels - single connection.
134135
*/
135136
CHANNEL,
137+
136138
/**
137139
* Cache connections and channels within each connection.
138140
*/
139141
CONNECTION
142+
143+
}
144+
145+
/**
146+
* The type of publisher confirms to use.
147+
*/
148+
public enum ConfirmType {
149+
150+
/**
151+
* Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
152+
* within scoped operations.
153+
*/
154+
SIMPLE,
155+
156+
/**
157+
* Use with {@code CorrelationData} to correlate confirmations with sent
158+
* messsages.
159+
*/
160+
CORRELATED,
161+
162+
/**
163+
* Publisher confirms are disabled (default).
164+
*/
165+
NONE
166+
140167
}
141168

142169
private final Set<ChannelCachingConnectionProxy> allocatedConnections = new HashSet<>();
@@ -176,9 +203,7 @@ public enum CacheMode {
176203

177204
private int connectionLimit = Integer.MAX_VALUE;
178205

179-
private boolean publisherConfirms;
180-
181-
private boolean simplePublisherConfirms;
206+
private ConfirmType confirmType = ConfirmType.NONE;
182207

183208
private boolean publisherReturns;
184209

@@ -356,7 +381,7 @@ public void setConnectionLimit(int connectionLimit) {
356381

357382
@Override
358383
public boolean isPublisherConfirms() {
359-
return this.publisherConfirms;
384+
return ConfirmType.CORRELATED.equals(this.confirmType);
360385
}
361386

362387
@Override
@@ -372,36 +397,50 @@ public void setPublisherReturns(boolean publisherReturns) {
372397
}
373398

374399
/**
375-
* Use full publisher confirms, with correlation data and a callback for each message.
400+
* Use full (correlated) publisher confirms, with correlation data and a callback for
401+
* each message.
376402
* @param publisherConfirms true for full publisher returns,
377403
* @since 1.1
404+
* @deprecated in favor of {@link #setPublisherConfirmType(ConfirmType)}.
378405
* @see #setSimplePublisherConfirms(boolean)
379406
*/
407+
@Deprecated
380408
public void setPublisherConfirms(boolean publisherConfirms) {
381-
Assert.isTrue(!this.simplePublisherConfirms, "Cannot set both publisherConfirms and simplePublisherConfirms");
382-
this.publisherConfirms = publisherConfirms;
383-
if (this.publisherConnectionFactory != null) {
384-
this.publisherConnectionFactory.setPublisherConfirms(publisherConfirms);
385-
}
409+
Assert.isTrue(!ConfirmType.SIMPLE.equals(this.confirmType),
410+
"Cannot set both publisherConfirms and simplePublisherConfirms");
411+
setPublisherConfirmType(ConfirmType.CORRELATED);
386412
}
387413

388414
/**
389415
* Use simple publisher confirms where the template simply waits for completion.
390416
* @param simplePublisherConfirms true for confirms.
391417
* @since 2.1
418+
* @deprecated in favor of {@link #setPublisherConfirmType(ConfirmType)}.
392419
* @see #setPublisherConfirms(boolean)
393420
*/
421+
@Deprecated
394422
public void setSimplePublisherConfirms(boolean simplePublisherConfirms) {
395-
Assert.isTrue(!this.publisherConfirms, "Cannot set both publisherConfirms and simplePublisherConfirms");
396-
this.simplePublisherConfirms = simplePublisherConfirms;
397-
if (this.publisherConnectionFactory != null) {
398-
this.publisherConnectionFactory.setSimplePublisherConfirms(simplePublisherConfirms);
399-
}
423+
Assert.isTrue(!ConfirmType.CORRELATED.equals(this.confirmType),
424+
"Cannot set both publisherConfirms and simplePublisherConfirms");
425+
setPublisherConfirmType(ConfirmType.SIMPLE);
400426
}
401427

402428
@Override
403429
public boolean isSimplePublisherConfirms() {
404-
return this.simplePublisherConfirms;
430+
return this.confirmType.equals(ConfirmType.SIMPLE);
431+
}
432+
433+
/**
434+
* Set the confirm type to use; default {@link ConfirmType#NONE}.
435+
* @param confirmType the confirm type.
436+
* @since 2.2
437+
*/
438+
public void setPublisherConfirmType(ConfirmType confirmType) {
439+
Assert.notNull(confirmType, "'confirmType' cannot be null");
440+
this.confirmType = confirmType;
441+
if (this.publisherConnectionFactory != null) {
442+
this.publisherConnectionFactory.setPublisherConfirmType(confirmType);
443+
}
405444
}
406445

407446
/**
@@ -630,7 +669,7 @@ private ChannelProxy getCachedChannelProxy(ChannelCachingConnectionProxy connect
630669
}
631670
getChannelListener().onCreate(targetChannel, transactional);
632671
Class<?>[] interfaces;
633-
if (this.publisherConfirms || this.publisherReturns) {
672+
if (ConfirmType.CORRELATED.equals(this.confirmType) || this.publisherReturns) {
634673
interfaces = new Class<?>[] { ChannelProxy.class, PublisherCallbackChannel.class };
635674
}
636675
else {
@@ -672,15 +711,15 @@ else if (this.cacheMode == CacheMode.CONNECTION) {
672711

673712
private Channel doCreateBareChannel(ChannelCachingConnectionProxy conn, boolean transactional) {
674713
Channel channel = conn.createBareChannel(transactional);
675-
if (this.publisherConfirms || this.simplePublisherConfirms) {
714+
if (!ConfirmType.NONE.equals(this.confirmType)) {
676715
try {
677716
channel.confirmSelect();
678717
}
679718
catch (IOException e) {
680719
logger.error("Could not configure the channel to receive publisher confirms", e);
681720
}
682721
}
683-
if ((this.publisherConfirms || this.publisherReturns)
722+
if ((ConfirmType.CORRELATED.equals(this.confirmType) || this.publisherReturns)
684723
&& !(channel instanceof PublisherCallbackChannelImpl)) {
685724
channel = this.publisherChannelFactory.createChannel(channel, getChannelsExecutor());
686725
}
@@ -1037,9 +1076,10 @@ private final class CachedChannelInvocationHandler implements InvocationHandler
10371076

10381077
private final boolean transactional;
10391078

1040-
private final boolean confirmSelected = CachingConnectionFactory.this.simplePublisherConfirms;
1079+
private final boolean confirmSelected = ConfirmType.SIMPLE.equals(CachingConnectionFactory.this.confirmType);
10411080

1042-
private final boolean publisherConfirms = CachingConnectionFactory.this.publisherConfirms;
1081+
private final boolean publisherConfirms =
1082+
ConfirmType.CORRELATED.equals(CachingConnectionFactory.this.confirmType);
10431083

10441084
private volatile Channel target;
10451085

@@ -1285,7 +1325,7 @@ private void physicalClose(Object proxy) throws IOException, TimeoutException {
12851325
boolean async = false;
12861326
try {
12871327
if (CachingConnectionFactory.this.active &&
1288-
(CachingConnectionFactory.this.publisherConfirms ||
1328+
(ConfirmType.CORRELATED.equals(CachingConnectionFactory.this.confirmType) ||
12891329
CachingConnectionFactory.this.publisherReturns)) {
12901330
async = true;
12911331
asyncClose(proxy);
@@ -1317,7 +1357,7 @@ private void asyncClose(Object proxy) {
13171357
try {
13181358
executorService.execute(() -> {
13191359
try {
1320-
if (CachingConnectionFactory.this.publisherConfirms) {
1360+
if (ConfirmType.CORRELATED.equals(CachingConnectionFactory.this.confirmType)) {
13211361
channel.waitForConfirmsOrDie(ASYNC_CLOSE_TIMEOUT);
13221362
}
13231363
else {

spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-2.2.xsd

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1454,10 +1454,21 @@
14541454
<xsd:attribute name="publisher-confirms" type="xsd:string" use="optional">
14551455
<xsd:annotation>
14561456
<xsd:documentation><![CDATA[
1457-
When true, channels on connections created by this factory support publisher confirms.
1457+
When true, channels on connections created by this factory support correlated publisher confirms.
1458+
DEPRECATED in favor of 'confirm-type'.
14581459
]]></xsd:documentation>
14591460
</xsd:annotation>
14601461
</xsd:attribute>
1462+
<xsd:attribute name="confirm-type" use="optional">
1463+
<xsd:annotation>
1464+
<xsd:documentation><![CDATA[
1465+
The type of publisher confirmation to use, or NONE to disable.
1466+
]]></xsd:documentation>
1467+
</xsd:annotation>
1468+
<xsd:simpleType>
1469+
<xsd:union memberTypes="confirmTypes xsd:string"/>
1470+
</xsd:simpleType>
1471+
</xsd:attribute>
14611472
<xsd:attribute name="publisher-returns" type="xsd:string" use="optional">
14621473
<xsd:annotation>
14631474
<xsd:documentation><![CDATA[
@@ -1594,6 +1605,14 @@
15941605
</xsd:restriction>
15951606
</xsd:simpleType>
15961607

1608+
<xsd:simpleType name="confirmTypes">
1609+
<xsd:restriction base="xsd:token">
1610+
<xsd:enumeration value="SIMPLE"/>
1611+
<xsd:enumeration value="CORRELATED"/>
1612+
<xsd:enumeration value="NONE"/>
1613+
</xsd:restriction>
1614+
</xsd:simpleType>
1615+
15971616
<xsd:simpleType name="containerTypes">
15981617
<xsd:restriction base="xsd:token">
15991618
<xsd:enumeration value="simple" />

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitConverterFuture;
4343
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitMessageFuture;
4444
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
45+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
4546
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
4647
import org.springframework.amqp.rabbit.core.RabbitAdmin;
4748
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -463,7 +464,7 @@ public AtomicReference<CountDownLatch> latch() {
463464
@Bean
464465
public ConnectionFactory connectionFactory() {
465466
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
466-
connectionFactory.setPublisherConfirms(true);
467+
connectionFactory.setPublisherConfirmType(ConfirmType.CORRELATED);
467468
connectionFactory.setPublisherReturns(true);
468469
return connectionFactory;
469470
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParserTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.junit.Test;
2626

2727
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
28+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
2829
import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy;
2930
import org.springframework.amqp.utils.test.TestUtils;
3031
import org.springframework.beans.DirectFieldAccessor;
@@ -61,7 +62,7 @@ public void testKitchenSink() throws Exception {
6162
assertThat(connectionFactory.getChannelCacheSize()).isEqualTo(10);
6263
DirectFieldAccessor dfa = new DirectFieldAccessor(connectionFactory);
6364
assertThat(dfa.getPropertyValue("executorService")).isNull();
64-
assertThat(dfa.getPropertyValue("publisherConfirms")).isEqualTo(Boolean.TRUE);
65+
assertThat(dfa.getPropertyValue("confirmType")).isEqualTo(ConfirmType.CORRELATED);
6566
assertThat(dfa.getPropertyValue("publisherReturns")).isEqualTo(Boolean.TRUE);
6667
assertThat(TestUtils.getPropertyValue(connectionFactory, "rabbitConnectionFactory.requestedHeartbeat")).isEqualTo(123);
6768
assertThat(TestUtils.getPropertyValue(connectionFactory, "rabbitConnectionFactory.connectionTimeout")).isEqualTo(789);
@@ -87,7 +88,7 @@ public void testWithExecutor() throws Exception {
8788
ThreadPoolTaskExecutor exec = beanFactory.getBean("exec", ThreadPoolTaskExecutor.class);
8889
assertThat(executor).isSameAs(exec.getThreadPoolExecutor());
8990
DirectFieldAccessor dfa = new DirectFieldAccessor(connectionFactory);
90-
assertThat(dfa.getPropertyValue("publisherConfirms")).isEqualTo(Boolean.FALSE);
91+
assertThat(dfa.getPropertyValue("confirmType")).isEqualTo(ConfirmType.NONE);
9192
assertThat(dfa.getPropertyValue("publisherReturns")).isEqualTo(Boolean.FALSE);
9293
assertThat(connectionFactory.getCacheMode()).isEqualTo(CachingConnectionFactory.CacheMode.CONNECTION);
9394
assertThat(TestUtils.getPropertyValue(connectionFactory, "rabbitConnectionFactory.connectionTimeout")).isEqualTo(new ConnectionFactory().getConnectionTimeout());
@@ -103,6 +104,8 @@ public void testWithExecutorService() throws Exception {
103104
assertThat(executor).isNotNull();
104105
ExecutorService exec = beanFactory.getBean("execService", ExecutorService.class);
105106
assertThat(executor).isSameAs(exec);
107+
DirectFieldAccessor dfa = new DirectFieldAccessor(connectionFactory);
108+
assertThat(dfa.getPropertyValue("confirmType")).isEqualTo(ConfirmType.SIMPLE);
106109
}
107110

108111
@Test

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.springframework.amqp.AmqpConnectException;
7474
import org.springframework.amqp.AmqpTimeoutException;
7575
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode;
76+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
7677
import org.springframework.amqp.rabbit.core.RabbitTemplate;
7778
import org.springframework.amqp.utils.test.TestUtils;
7879
import org.springframework.context.ApplicationContext;
@@ -623,7 +624,7 @@ private void testCheckoutLimitWithPublisherConfirms(boolean physicalClose) throw
623624
ccf.setExecutor(exec);
624625
ccf.setChannelCacheSize(1);
625626
ccf.setChannelCheckoutTimeout(1);
626-
ccf.setPublisherConfirms(true);
627+
ccf.setPublisherConfirmType(ConfirmType.CORRELATED);
627628

628629
final Connection con = ccf.createConnection();
629630

@@ -692,7 +693,7 @@ public void testCheckoutLimitWithPublisherConfirmsLogicalAlreadyCloses() throws
692693
ccf.setExecutor(mock(ExecutorService.class));
693694
ccf.setChannelCacheSize(1);
694695
ccf.setChannelCheckoutTimeout(1);
695-
ccf.setPublisherConfirms(true);
696+
ccf.setPublisherConfirmType(ConfirmType.CORRELATED);
696697

697698
RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf);
698699
rabbitTemplate.convertAndSend("foo", "bar");
@@ -1579,7 +1580,9 @@ private void testConsumerChannelPhysicallyClosedWhenNotIsOpenGuts(boolean confir
15791580

15801581
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
15811582
ccf.setExecutor(executor);
1582-
ccf.setPublisherConfirms(confirms);
1583+
if (confirms) {
1584+
ccf.setPublisherConfirmType(ConfirmType.CORRELATED);
1585+
}
15831586
Connection con = ccf.createConnection();
15841587

15851588
Channel channel = con.createChannel(false);
@@ -1746,7 +1749,7 @@ public void testOrderlyShutDown() throws Exception {
17461749
given(mockConnection.isOpen()).willReturn(true);
17471750

17481751
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
1749-
ccf.setPublisherConfirms(true);
1752+
ccf.setPublisherConfirmType(ConfirmType.CORRELATED);
17501753
ApplicationContext ac = mock(ApplicationContext.class);
17511754
ccf.setApplicationContext(ac);
17521755
PublisherCallbackChannel pcc = mock(PublisherCallbackChannel.class);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.springframework.amqp.core.ReceiveAndReplyMessageCallback;
7373
import org.springframework.amqp.core.ReplyToAddressCallback;
7474
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
75+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
7576
import org.springframework.amqp.rabbit.connection.ChannelListener;
7677
import org.springframework.amqp.rabbit.connection.ClosingRecoveryListener;
7778
import org.springframework.amqp.rabbit.connection.Connection;
@@ -1601,7 +1602,7 @@ public void testInvoke() {
16011602

16021603
@Test
16031604
public void waitForConfirms() {
1604-
this.connectionFactory.setPublisherConfirms(true);
1605+
this.connectionFactory.setPublisherConfirmType(ConfirmType.CORRELATED);
16051606
Collection<?> messages = getMessagesToSend();
16061607
Boolean result = this.template.invoke(t -> {
16071608
messages.forEach(m -> t.convertAndSend(ROUTE, m));

0 commit comments

Comments
 (0)