Skip to content

Commit 3da5931

Browse files
garyrussellartembilan
authored andcommitted
AMQP-806: Simple Publisher Confirms
JIRA: https://jira.spring.io/browse/AMQP-806 Avoid complex infrastructure for simple use cases. * Simple polishing
1 parent 450d27f commit 3da5931

File tree

7 files changed

+237
-22
lines changed

7 files changed

+237
-22
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class MessageProperties implements Serializable {
6262

6363
public static final Integer DEFAULT_PRIORITY = 0;
6464

65-
private final Map<String, Object> headers = new HashMap<String, Object>();
65+
private final Map<String, Object> headers = new HashMap<>();
6666

6767
private volatile Date timestamp;
6868

@@ -120,6 +120,8 @@ public class MessageProperties implements Serializable {
120120

121121
private volatile boolean finalRetryForMessageWithNoId;
122122

123+
private volatile long publishSequenceNumber;
124+
123125
private volatile transient Type inferredArgumentType;
124126

125127
private volatile transient Method targetMethod;
@@ -460,6 +462,24 @@ public void setFinalRetryForMessageWithNoId(boolean finalRetryForMessageWithNoId
460462
this.finalRetryForMessageWithNoId = finalRetryForMessageWithNoId;
461463
}
462464

465+
/**
466+
* Return the publish sequence number if publisher confirms are enabled; set by the template.
467+
* @return the sequence number.
468+
* @since 2.1
469+
*/
470+
public long getPublishSequenceNumber() {
471+
return this.publishSequenceNumber;
472+
}
473+
474+
/**
475+
* Set the publish sequence number, if publisher confirms are enabled; set by the template.
476+
* @param publishSequenceNumber the sequence number.
477+
* @since 2.1
478+
*/
479+
public void setPublishSequenceNumber(long publishSequenceNumber) {
480+
this.publishSequenceNumber = publishSequenceNumber;
481+
}
482+
463483
/**
464484
* The inferred target argument type when using a method-level
465485
* {@code @RabbitListener}.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -142,34 +142,35 @@ public enum CacheMode {
142142

143143
private final CachingConnectionFactory publisherConnectionFactory;
144144

145-
private volatile long channelCheckoutTimeout = 0;
145+
/** Synchronization monitor for the shared Connection. */
146+
private final Object connectionMonitor = new Object();
146147

147-
private volatile CacheMode cacheMode = CacheMode.CHANNEL;
148+
/** Executor used for deferred close if no explicit executor set. */
149+
private final ExecutorService deferredCloseExecutor = Executors.newCachedThreadPool();
148150

149-
private volatile int channelCacheSize = DEFAULT_CHANNEL_CACHE_SIZE;
151+
private long channelCheckoutTimeout = 0;
150152

151-
private volatile int connectionCacheSize = 1;
153+
private CacheMode cacheMode = CacheMode.CHANNEL;
152154

153-
private volatile int connectionLimit = Integer.MAX_VALUE;
155+
private int channelCacheSize = DEFAULT_CHANNEL_CACHE_SIZE;
154156

155-
private volatile boolean active = true;
157+
private int connectionCacheSize = 1;
156158

157-
private volatile boolean publisherConfirms;
159+
private int connectionLimit = Integer.MAX_VALUE;
158160

159-
private volatile boolean publisherReturns;
161+
private boolean publisherConfirms;
160162

161-
private volatile boolean initialized;
163+
private boolean simplePublisherConfirms;
162164

163-
private volatile boolean stopped;
165+
private boolean publisherReturns;
164166

165-
private volatile ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger();
167+
private ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger();
166168

167-
/** Synchronization monitor for the shared Connection. */
168-
private final Object connectionMonitor = new Object();
169+
private volatile boolean active = true;
169170

170-
/** Executor used for deferred close if no explicit executor set. */
171-
private final ExecutorService deferredCloseExecutor = Executors.newCachedThreadPool();
171+
private volatile boolean initialized;
172172

173+
private volatile boolean stopped;
173174

174175
/**
175176
* Create a new CachingConnectionFactory initializing the hostname to be the value returned from
@@ -340,13 +341,39 @@ public void setPublisherReturns(boolean publisherReturns) {
340341
}
341342
}
342343

344+
/**
345+
* Use full publisher confirms, with correlation data and a callback for each message.
346+
* @param publisherConfirms true for full publisher returns,
347+
* @since 1.1
348+
* @see #setSimplePublisherConfirms(boolean)
349+
*/
343350
public void setPublisherConfirms(boolean publisherConfirms) {
351+
Assert.isTrue(!this.simplePublisherConfirms, "Cannot set both publisherConfirms and simplePublisherConfirms");
344352
this.publisherConfirms = publisherConfirms;
345353
if (this.publisherConnectionFactory != null) {
346354
this.publisherConnectionFactory.setPublisherConfirms(publisherConfirms);
347355
}
348356
}
349357

358+
/**
359+
* Use simple publisher confirms where the template simply waits for completion.
360+
* @param simplePublisherConfirms true for confirms.
361+
* @since 2.1
362+
* @see #setPublisherConfirms(boolean)
363+
*/
364+
public void setSimplePublisherConfirms(boolean simplePublisherConfirms) {
365+
Assert.isTrue(!this.publisherConfirms, "Cannot set both publisherConfirms and simplePublisherConfirms");
366+
this.simplePublisherConfirms = simplePublisherConfirms;
367+
if (this.publisherConnectionFactory != null) {
368+
this.publisherConnectionFactory.setSimplePublisherConfirms(simplePublisherConfirms);
369+
}
370+
}
371+
372+
@Override
373+
public boolean isSimplePublisherConfirms() {
374+
return this.simplePublisherConfirms;
375+
}
376+
350377
/**
351378
* Sets the channel checkout timeout. When greater than 0, enables channel limiting
352379
* in that the {@link #channelCacheSize} becomes the total number of available channels per
@@ -583,7 +610,7 @@ else if (this.cacheMode == CacheMode.CONNECTION) {
583610

584611
private Channel doCreateBareChannel(ChannelCachingConnectionProxy connection, boolean transactional) {
585612
Channel channel = connection.createBareChannel(transactional);
586-
if (this.publisherConfirms) {
613+
if (this.publisherConfirms || this.simplePublisherConfirms) {
587614
try {
588615
channel.confirmSelect();
589616
}
@@ -894,6 +921,8 @@ private final class CachedChannelInvocationHandler implements InvocationHandler
894921

895922
private final boolean transactional;
896923

924+
private volatile boolean confirmSelected = CachingConnectionFactory.this.simplePublisherConfirms;
925+
897926
private volatile Channel target;
898927

899928
private volatile boolean txStarted;
@@ -956,6 +985,9 @@ else if (methodName.equals("isOpen")) {
956985
else if (methodName.equals("isTransactional")) {
957986
return this.transactional;
958987
}
988+
else if (methodName.equals("isConfirmSelected")) {
989+
return this.confirmSelected;
990+
}
959991
try {
960992
if (this.target == null || !this.target.isOpen()) {
961993
if (this.target instanceof PublisherCallbackChannel) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ChannelProxy.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -42,4 +42,13 @@ public interface ChannelProxy extends Channel {
4242
*/
4343
boolean isTransactional();
4444

45+
/**
46+
* Return true if confirms are selected on this channel.
47+
* @return true if confirms selected.
48+
* @since 2.1
49+
*/
50+
default boolean isConfirmSelected() {
51+
return false;
52+
}
53+
4554
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactory.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,4 +56,13 @@ public interface ConnectionFactory {
5656
return null;
5757
}
5858

59+
/**
60+
* Return true if simple publisher confirms are enabled.
61+
* @return simplePublisherConfirms
62+
* @since 2.1
63+
*/
64+
default boolean isSimplePublisherConfirms() {
65+
return false;
66+
}
67+
5968
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import com.rabbitmq.client.AMQP.BasicProperties;
9797
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
9898
import com.rabbitmq.client.Channel;
99+
import com.rabbitmq.client.ConfirmListener;
99100
import com.rabbitmq.client.DefaultConsumer;
100101
import com.rabbitmq.client.Envelope;
101102
import com.rabbitmq.client.GetResponse;
@@ -1848,6 +1849,22 @@ private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionF
18481849

18491850
@Override
18501851
public <T> T invoke(OperationsCallback<T> action) {
1852+
return invoke(action, null, null);
1853+
}
1854+
1855+
/**
1856+
* Invoke operations on the same channel.
1857+
* If callbacks are needed, both callbacks must be supplied.
1858+
* @param action the callback.
1859+
* @param acks a confirm callback for acks.
1860+
* @param nacks a confirm callback for nacks.
1861+
* @param <T> the return type.
1862+
* @return the result of the action method.
1863+
* @since 2.1
1864+
*/
1865+
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
1866+
com.rabbitmq.client.ConfirmCallback nacks) {
1867+
18511868
final Channel currentChannel = this.dedicatedChannels.get();
18521869
Assert.state(currentChannel == null, () -> "Nested invoke() calls are not supported; channel '" + currentChannel
18531870
+ "' is already associated with this thread");
@@ -1881,10 +1898,18 @@ public <T> T invoke(OperationsCallback<T> action) {
18811898
throw e;
18821899
}
18831900
}
1901+
ConfirmListener listener = null;
1902+
if (acks != null && nacks != null && channel instanceof ChannelProxy
1903+
&& ((ChannelProxy) channel).isConfirmSelected()) {
1904+
listener = channel.addConfirmListener(acks, nacks);
1905+
}
18841906
try {
18851907
return action.doInRabbit(this);
18861908
}
18871909
finally {
1910+
if (listener != null) {
1911+
channel.removeConfirmListener(listener);
1912+
}
18881913
this.activeTemplateCallbacks.decrementAndGet();
18891914
this.dedicatedChannels.remove();
18901915
if (resourceHolder != null) {
@@ -2004,9 +2029,15 @@ private void setupConfirm(Channel channel, Message message, CorrelationData corr
20042029
correlationData = this.correlationDataPostProcessor != null
20052030
? this.correlationDataPostProcessor.postProcess(message, correlationData)
20062031
: correlationData;
2007-
publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(),
2032+
long nextPublishSeqNo = channel.getNextPublishSeqNo();
2033+
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
2034+
publisherCallbackChannel.addPendingConfirm(this, nextPublishSeqNo,
20082035
new PendingConfirm(correlationData, System.currentTimeMillis()));
20092036
}
2037+
else if (channel instanceof ChannelProxy && ((ChannelProxy) channel).isConfirmSelected()) {
2038+
long nextPublishSeqNo = channel.getNextPublishSeqNo();
2039+
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
2040+
}
20102041
}
20112042

20122043
/**
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.core;
18+
19+
import static org.hamcrest.Matchers.equalTo;
20+
import static org.junit.Assert.assertThat;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import java.util.concurrent.atomic.AtomicLong;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
26+
import org.junit.jupiter.api.Test;
27+
28+
import org.springframework.amqp.core.MessageProperties;
29+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
30+
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
31+
32+
/**
33+
* @author Gary Russell
34+
*
35+
* @since 2.1
36+
*
37+
*/
38+
@RabbitAvailable(queues = SimplePublisherConfirmsTests.QUEUE)
39+
public class SimplePublisherConfirmsTests {
40+
41+
public static final String QUEUE = "simple.confirms";
42+
43+
@Test
44+
public void testConfirms() {
45+
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
46+
cf.setSimplePublisherConfirms(true);
47+
RabbitTemplate template = new RabbitTemplate(cf);
48+
template.setRoutingKey(QUEUE);
49+
assertTrue(template.invoke(t -> {
50+
template.convertAndSend("foo");
51+
template.convertAndSend("bar");
52+
template.waitForConfirmsOrDie(10_000);
53+
return true;
54+
}));
55+
cf.destroy();
56+
}
57+
58+
@Test
59+
public void testConfirmsWithCallbacks() {
60+
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
61+
cf.setSimplePublisherConfirms(true);
62+
RabbitTemplate template = new RabbitTemplate(cf);
63+
template.setRoutingKey(QUEUE);
64+
AtomicReference<MessageProperties> finalProperties = new AtomicReference<>();
65+
AtomicLong lastAck = new AtomicLong();
66+
assertTrue(template.invoke(t -> {
67+
template.convertAndSend("foo");
68+
template.convertAndSend("bar", m -> {
69+
finalProperties.set(m.getMessageProperties());
70+
return m;
71+
});
72+
template.waitForConfirmsOrDie(10_000);
73+
return true;
74+
}, (tag, multiple) -> {
75+
lastAck.set(tag);
76+
}, (tag, multiple) -> { }));
77+
assertThat(lastAck.get(), equalTo(finalProperties.get().getPublishSequenceNumber()));
78+
cf.destroy();
79+
}
80+
81+
}

0 commit comments

Comments
 (0)