Skip to content

Commit 144bf81

Browse files
garyrussellartembilan
authored andcommitted
GH-1219: Fix header mapping for replies (@sendto)
Resolves #1219 The headers were mapped after message conversion. This prevented using a `ContentTypeDelegatingMessageConverter` because the content type was not set. Add a header to control whether the user or converter gets to set the content type property in the final message. **cherry-pick to 2.2.x, 2.1.x, 1.7.x** # Conflicts: # spring-amqp/src/main/java/org/springframework/amqp/support/AmqpHeaders.java # spring-amqp/src/main/java/org/springframework/amqp/support/converter/MessagingMessageConverter.java # src/reference/asciidoc/amqp.adoc
1 parent e85efb4 commit 144bf81

File tree

4 files changed

+222
-5
lines changed

4 files changed

+222
-5
lines changed

spring-amqp/src/main/java/org/springframework/amqp/support/AmqpHeaders.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,6 +49,8 @@ public abstract class AmqpHeaders {
4949

5050
public static final String CONTENT_TYPE = MessageHeaders.CONTENT_TYPE;
5151

52+
public static final String CONTENT_TYPE_CONVERTER_WINS = PREFIX + "contentTypeConverterWins";
53+
5254
public static final String CORRELATION_ID = PREFIX + "correlationId";
5355

5456
public static final String DELAY = PREFIX + "delay";

spring-amqp/src/main/java/org/springframework/amqp/support/converter/MessagingMessageConverter.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,9 +20,11 @@
2020

2121
import org.springframework.amqp.core.MessageProperties;
2222
import org.springframework.amqp.support.AmqpHeaderMapper;
23+
import org.springframework.amqp.support.AmqpHeaders;
2324
import org.springframework.amqp.support.SimpleAmqpHeaderMapper;
2425
import org.springframework.beans.factory.InitializingBean;
2526
import org.springframework.messaging.Message;
27+
import org.springframework.messaging.MessageHeaders;
2628
import org.springframework.messaging.support.MessageBuilder;
2729
import org.springframework.util.Assert;
2830

@@ -101,10 +103,16 @@ public org.springframework.amqp.core.Message toMessage(Object object, MessagePro
101103
Message.class.getName() + "] is handled by this converter");
102104
}
103105
Message<?> input = (Message<?>) object;
106+
this.headerMapper.fromHeaders(input.getHeaders(), messageProperties);
104107
org.springframework.amqp.core.Message amqpMessage = this.payloadConverter.toMessage(
105108
input.getPayload(), messageProperties);
106-
107-
this.headerMapper.fromHeaders(input.getHeaders(), messageProperties);
109+
// Default previous behavior of mapper wins for backwards compatibility.
110+
if (!Boolean.TRUE.equals(input.getHeaders().get(AmqpHeaders.CONTENT_TYPE_CONVERTER_WINS))) {
111+
String contentType = input.getHeaders().get(MessageHeaders.CONTENT_TYPE, String.class);
112+
if (contentType != null) {
113+
messageProperties.setContentType(contentType);
114+
}
115+
}
108116
return amqpMessage;
109117
}
110118

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.annotation;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.amqp.core.AnonymousQueue;
27+
import org.springframework.amqp.core.Message;
28+
import org.springframework.amqp.core.MessageProperties;
29+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
30+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
31+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
32+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
33+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
34+
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
35+
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
36+
import org.springframework.amqp.support.AmqpHeaders;
37+
import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;
38+
import org.springframework.amqp.support.converter.MessageConversionException;
39+
import org.springframework.amqp.support.converter.MessageConverter;
40+
import org.springframework.amqp.support.converter.SimpleMessageConverter;
41+
import org.springframework.beans.factory.annotation.Autowired;
42+
import org.springframework.context.annotation.Bean;
43+
import org.springframework.context.annotation.Configuration;
44+
import org.springframework.messaging.MessageHeaders;
45+
import org.springframework.messaging.handler.annotation.SendTo;
46+
import org.springframework.messaging.support.MessageBuilder;
47+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
48+
49+
/**
50+
* @author Gary Russell
51+
* @since 2.5
52+
*
53+
*/
54+
@RabbitAvailable
55+
@SpringJUnitConfig
56+
public class ContentTypeDelegatingMessageConverterIntegrationTests {
57+
58+
@Autowired
59+
private Config config;
60+
61+
@Autowired
62+
private RabbitTemplate template;
63+
64+
@Autowired
65+
private AnonymousQueue queue1;
66+
67+
@Test
68+
void testReplyContentType() throws InterruptedException {
69+
this.template.convertAndSend(this.queue1.getName(), "foo", msg -> {
70+
msg.getMessageProperties().setContentType("foo/bar");
71+
return msg;
72+
});
73+
assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue();
74+
assertThat(this.config.replyContentType).isEqualTo("baz/qux");
75+
assertThat(this.config.receivedReplyContentType).isEqualTo("baz/qux");
76+
77+
this.template.convertAndSend(this.queue1.getName(), "bar", msg -> {
78+
msg.getMessageProperties().setContentType("foo/bar");
79+
return msg;
80+
});
81+
assertThat(this.config.latch2.await(10, TimeUnit.SECONDS)).isTrue();
82+
assertThat(this.config.replyContentType).isEqualTo("baz/qux");
83+
assertThat(this.config.receivedReplyContentType).isEqualTo("foo/bar");
84+
}
85+
86+
@Configuration
87+
@EnableRabbit
88+
public static class Config {
89+
90+
final CountDownLatch latch1 = new CountDownLatch(1);
91+
92+
final CountDownLatch latch2 = new CountDownLatch(2);
93+
94+
volatile String replyContentType;
95+
96+
volatile String receivedReplyContentType;
97+
98+
@Bean
99+
public ConnectionFactory cf() {
100+
return new CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
101+
}
102+
103+
@Bean
104+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
105+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
106+
factory.setConnectionFactory(cf());
107+
ContentTypeDelegatingMessageConverter converter =
108+
new ContentTypeDelegatingMessageConverter(new SimpleMessageConverter());
109+
converter.addDelegate("foo/bar", new MessageConverter() {
110+
111+
@Override
112+
public Message toMessage(Object object, MessageProperties messageProperties)
113+
throws MessageConversionException {
114+
115+
return new Message("foo".getBytes(), messageProperties);
116+
}
117+
118+
@Override
119+
public Object fromMessage(Message message) throws MessageConversionException {
120+
return new String(message.getBody());
121+
}
122+
123+
});
124+
converter.addDelegate("baz/qux", new MessageConverter() {
125+
126+
@Override
127+
public Message toMessage(Object object, MessageProperties messageProperties)
128+
throws MessageConversionException {
129+
130+
Config.this.replyContentType = messageProperties.getContentType();
131+
messageProperties.setContentType("foo/bar");
132+
return new Message("foo".getBytes(), messageProperties);
133+
}
134+
135+
@Override
136+
public Object fromMessage(Message message) throws MessageConversionException {
137+
return new String(message.getBody());
138+
}
139+
140+
});
141+
factory.setMessageConverter(converter);
142+
return factory;
143+
}
144+
145+
@Bean
146+
public RabbitTemplate template() {
147+
return new RabbitTemplate(cf());
148+
}
149+
150+
@Bean
151+
public RabbitAdmin admin() {
152+
return new RabbitAdmin(cf());
153+
}
154+
155+
@Bean
156+
public AnonymousQueue queue1() {
157+
return new AnonymousQueue();
158+
}
159+
160+
@Bean
161+
public AnonymousQueue queue2() {
162+
return new AnonymousQueue();
163+
}
164+
165+
@RabbitListener(queues = "#{@queue1.name}")
166+
@SendTo("#{@queue2.name}")
167+
public org.springframework.messaging.Message<String> listen1(String in) {
168+
MessageBuilder<String> builder = MessageBuilder.withPayload(in)
169+
.setHeader(MessageHeaders.CONTENT_TYPE, "baz/qux");
170+
if ("bar".equals(in)) {
171+
builder.setHeader(AmqpHeaders.CONTENT_TYPE_CONVERTER_WINS, true);
172+
}
173+
return builder.build();
174+
}
175+
176+
@RabbitListener(queues = "#{@queue2.name}")
177+
public void listen2(Message in) {
178+
this.receivedReplyContentType = in.getMessageProperties().getContentType();
179+
this.latch1.countDown();
180+
this.latch2.countDown();
181+
}
182+
183+
}
184+
185+
}

src/reference/asciidoc/amqp.adoc

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2513,7 +2513,7 @@ public Message<OrderStatus> processOrder(Order order) {
25132513
----
25142514
====
25152515

2516-
The `@SendTo` value is assumed as a reply `exchange` and `routingKey` pair that follws the `exchange/routingKey` pattern,
2516+
The `@SendTo` value is assumed as a reply `exchange` and `routingKey` pair that follows the `exchange/routingKey` pattern,
25172517
where one of those parts can be omitted.
25182518
The valid values are as follows:
25192519

@@ -2596,6 +2596,28 @@ public String listen(Message in) {
25962596
----
25972597
====
25982598

2599+
====== Reply ContentType
2600+
2601+
If you are using a sophisticated message converter, such as the `ContentTypeDelegatingMessageConverter`, you can control the content type of the reply by returning a `spring-messaging` `Message<?>`:
2602+
2603+
====
2604+
[source, java]
2605+
----
2606+
@RabbitListener(queues = "q1")
2607+
@SendTo("q2")
2608+
public Message<String> listen(String in) {
2609+
...
2610+
return MessageBuilder.withPayload(in.toUpperCase())
2611+
.setHeader(MessageHeaders.CONTENT_TYPE, "application/xml")
2612+
.build();
2613+
}
2614+
----
2615+
====
2616+
2617+
This content type will be passed in the `MessageProperties` to the converter.
2618+
By default, for backwards compatibility, any content type property set by the converter will be overwritten by this value after conversion.
2619+
If you wish to override that behavior, also set the `AmqpHeaders.CONTENT_TYPE_CONVERTER_WINS` to `true` and any value set by the converter will be retained.
2620+
25992621
[[annotation-method-selection]]
26002622
====== Multi-method Listeners
26012623

0 commit comments

Comments
 (0)