Skip to content

Commit dc457b8

Browse files
authored
Add CorrelationData spring-messaging header
In preparation for spring-attic/spring-cloud-stream-binder-rabbit#303 * Get header once only. * Close test connection factories.
1 parent 36e31be commit dc457b8

File tree

7 files changed

+147
-13
lines changed

7 files changed

+147
-13
lines changed

spring-amqp/src/main/java/org/springframework/amqp/support/AmqpHeaders.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ public abstract class AmqpHeaders {
8989

9090
public static final String SPRING_REPLY_TO_STACK = PREFIX + "springReplyToStack";
9191

92+
/**
93+
* A CorrelationData instance for publisher confirms (not mapped).
94+
* @since 2.3
95+
*/
96+
public static final String PUBLISH_CONFIRM_CORRELATION = PREFIX + "publishConfirmCorrelation";
97+
9298
public static final String PUBLISH_CONFIRM = PREFIX + "publishConfirm";
9399

94100
public static final String PUBLISH_CONFIRM_NACK_CAUSE = PREFIX + "publishConfirmNackCause";

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CorrelationData.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,11 +22,14 @@
2222
import org.springframework.util.concurrent.SettableListenableFuture;
2323

2424
/**
25-
* Base class for correlating publisher confirms to sent messages.
26-
* Use the {@link org.springframework.amqp.rabbit.core.RabbitTemplate}
27-
* methods that include one of
28-
* these as a parameter; when the publisher confirm is received,
29-
* the CorrelationData is returned with the ack/nack.
25+
* Base class for correlating publisher confirms to sent messages. Use the
26+
* {@link org.springframework.amqp.rabbit.core.RabbitTemplate} methods that include one of
27+
* these as a parameter; when the publisher confirm is received, the CorrelationData is
28+
* returned with the ack/nack. When returns are also enabled, the
29+
* {@link #setReturnedMessage(Message) returnedMessage} property will be populated when a
30+
* message can't be delivered - the return always arrives before the confirmation. In this
31+
* case the {@code #id} property must be set to a unique value.
32+
*
3033
* @author Gary Russell
3134
* @since 1.0.1
3235
*

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitMessagingTemplate.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,8 @@
1919
import java.util.Map;
2020

2121
import org.springframework.amqp.core.MessageProperties;
22+
import org.springframework.amqp.rabbit.connection.CorrelationData;
23+
import org.springframework.amqp.support.AmqpHeaders;
2224
import org.springframework.amqp.support.converter.MessageConverter;
2325
import org.springframework.amqp.support.converter.MessagingMessageConverter;
2426
import org.springframework.beans.factory.InitializingBean;
@@ -195,7 +197,13 @@ public <T> T convertSendAndReceive(String exchange, String routingKey, Object re
195197
@Override
196198
protected void doSend(String destination, Message<?> message) {
197199
try {
198-
this.rabbitTemplate.send(destination, createMessage(message));
200+
Object correlation = message.getHeaders().get(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION);
201+
if (correlation instanceof CorrelationData) {
202+
this.rabbitTemplate.send(destination, createMessage(message), (CorrelationData) correlation);
203+
}
204+
else {
205+
this.rabbitTemplate.send(destination, createMessage(message));
206+
}
199207
}
200208
catch (RuntimeException ex) {
201209
throw convertAmqpException(ex);
@@ -204,7 +212,13 @@ protected void doSend(String destination, Message<?> message) {
204212

205213
protected void doSend(String exchange, String routingKey, Message<?> message) {
206214
try {
207-
this.rabbitTemplate.send(exchange, routingKey, createMessage(message));
215+
Object correlation = message.getHeaders().get(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION);
216+
if (correlation instanceof CorrelationData) {
217+
this.rabbitTemplate.send(exchange, routingKey, createMessage(message), (CorrelationData) correlation);
218+
}
219+
else {
220+
this.rabbitTemplate.send(exchange, routingKey, createMessage(message));
221+
}
208222
}
209223
catch (RuntimeException ex) {
210224
throw convertAmqpException(ex);

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitOperations.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,21 @@ <T> T invoke(OperationsCallback<T> action, @Nullable com.rabbitmq.client.Confirm
108108
*/
109109
ConnectionFactory getConnectionFactory();
110110

111+
/**
112+
* Send a message to the default exchange with a specific routing key.
113+
*
114+
* @param routingKey the routing key
115+
* @param message a message to send
116+
* @param correlationData data to correlate publisher confirms.
117+
* @throws AmqpException if there is a problem
118+
* @since 2.3
119+
*/
120+
default void send(String routingKey, Message message, CorrelationData correlationData)
121+
throws AmqpException {
122+
123+
throw new UnsupportedOperationException("This implementation does not support this method");
124+
}
125+
111126
/**
112127
* Send a message to a specific exchange with a specific routing key.
113128
*

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,11 @@ public void send(String routingKey, Message message) throws AmqpException {
10321032
send(this.exchange, routingKey, message);
10331033
}
10341034

1035+
@Override
1036+
public void send(String routingKey, Message message, CorrelationData correlationData) throws AmqpException {
1037+
send(this.exchange, routingKey, message, correlationData);
1038+
}
1039+
10351040
@Override
10361041
public void send(final String exchange, final String routingKey, final Message message) throws AmqpException {
10371042
send(exchange, routingKey, message, null);
@@ -1045,8 +1050,7 @@ public void send(final String exchange, final String routingKey,
10451050
doSend(channel, exchange, routingKey, message,
10461051
(RabbitTemplate.this.returnsCallback != null
10471052
|| (correlationData != null && StringUtils.hasText(correlationData.getId())))
1048-
&& RabbitTemplate.this.mandatoryExpression.getValue(
1049-
RabbitTemplate.this.evaluationContext, message, Boolean.class),
1053+
&& isMandatoryFor(message),
10501054
correlationData);
10511055
return null;
10521056
}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.core;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.Collections;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
27+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
28+
import org.springframework.amqp.rabbit.connection.CorrelationData;
29+
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
30+
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
31+
import org.springframework.amqp.support.AmqpHeaders;
32+
import org.springframework.messaging.support.GenericMessage;
33+
34+
/**
35+
* @author Gary Russell
36+
* @since 2.3
37+
*
38+
*/
39+
@RabbitAvailable("messaging.confirms")
40+
public class MessagingTemplateConfirmsTests {
41+
42+
@Test
43+
void confirmHeader() throws Exception {
44+
CachingConnectionFactory ccf = new CachingConnectionFactory(
45+
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
46+
ccf.setPublisherConfirmType(ConfirmType.CORRELATED);
47+
RabbitTemplate rt = new RabbitTemplate(ccf);
48+
RabbitMessagingTemplate rmt = new RabbitMessagingTemplate(rt);
49+
CorrelationData data = new CorrelationData();
50+
rmt.send("messaging.confirms",
51+
new GenericMessage<>("foo", Collections.singletonMap(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, data)));
52+
assertThat(data.getFuture().get(10, TimeUnit.SECONDS).isAck()).isTrue();
53+
ccf.destroy();
54+
}
55+
56+
@Test
57+
void confirmHeaderUnroutable() throws Exception {
58+
CachingConnectionFactory ccf = new CachingConnectionFactory(
59+
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
60+
ccf.setPublisherConfirmType(ConfirmType.CORRELATED);
61+
ccf.setPublisherReturns(true);
62+
RabbitTemplate rt = new RabbitTemplate(ccf);
63+
rt.setMandatory(true);
64+
RabbitMessagingTemplate rmt = new RabbitMessagingTemplate(rt);
65+
CorrelationData data = new CorrelationData("foo");
66+
rmt.send("messaging.confirms.unroutable",
67+
new GenericMessage<>("foo", Collections.singletonMap(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, data)));
68+
assertThat(data.getFuture().get(10, TimeUnit.SECONDS).isAck()).isTrue();
69+
assertThat(data.getReturnedMessage()).isNotNull();
70+
ccf.destroy();
71+
}
72+
73+
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitMessagingTemplateTests.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import static org.mockito.Mockito.verify;
2929

3030
import java.io.Writer;
31+
import java.util.Collections;
3132
import java.util.HashMap;
3233
import java.util.Map;
3334

@@ -40,14 +41,17 @@
4041

4142
import org.springframework.amqp.core.MessageProperties;
4243
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
44+
import org.springframework.amqp.rabbit.connection.CorrelationData;
4345
import org.springframework.amqp.rabbit.test.MessageTestUtils;
46+
import org.springframework.amqp.support.AmqpHeaders;
4447
import org.springframework.amqp.support.converter.MessageConversionException;
4548
import org.springframework.amqp.support.converter.MessageConverter;
4649
import org.springframework.amqp.support.converter.MessagingMessageConverter;
4750
import org.springframework.amqp.support.converter.SimpleMessageConverter;
4851
import org.springframework.amqp.utils.test.TestUtils;
4952
import org.springframework.messaging.Message;
5053
import org.springframework.messaging.converter.GenericMessageConverter;
54+
import org.springframework.messaging.support.GenericMessage;
5155
import org.springframework.messaging.support.MessageBuilder;
5256

5357

@@ -83,7 +87,8 @@ public void verifyConverter() {
8387
RabbitTemplate template = new RabbitTemplate(mock(ConnectionFactory.class));
8488
RabbitMessagingTemplate rmt = new RabbitMessagingTemplate(template);
8589
rmt.afterPropertiesSet();
86-
assertThat(TestUtils.getPropertyValue(rmt, "amqpMessageConverter.payloadConverter")).isSameAs(template.getMessageConverter());
90+
assertThat(TestUtils.getPropertyValue(rmt, "amqpMessageConverter.payloadConverter"))
91+
.isSameAs(template.getMessageConverter());
8792

8893
rmt = new RabbitMessagingTemplate(template);
8994
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
@@ -94,6 +99,20 @@ public void verifyConverter() {
9499
assertThat(TestUtils.getPropertyValue(rmt, "amqpMessageConverter.payloadConverter")).isSameAs(payloadConverter);
95100
}
96101

102+
@Test
103+
void correlation() {
104+
this.messagingTemplate.setDefaultDestination("defRk");
105+
this.messagingTemplate.send(new GenericMessage<>("foo",
106+
Collections.singletonMap(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, new CorrelationData())));
107+
verify(this.rabbitTemplate).send(eq("defRk"), any(), any(CorrelationData.class));
108+
this.messagingTemplate.send("rk", new GenericMessage<>("foo",
109+
Collections.singletonMap(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, new CorrelationData())));
110+
verify(this.rabbitTemplate).send(eq("rk"), any(), any(CorrelationData.class));
111+
this.messagingTemplate.send("ex", "rk", new GenericMessage<>("foo",
112+
Collections.singletonMap(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, new CorrelationData())));
113+
verify(this.rabbitTemplate).send(eq("ex"), eq("rk"), any(), any(CorrelationData.class));
114+
}
115+
97116
@Test
98117
public void send() {
99118
Message<String> message = createTextMessage();

0 commit comments

Comments
 (0)