Skip to content

Commit 14c1046

Browse files
committed
GH-2994, GH-2986 Add initial support for customizing Message Converter behavior
primarily during batch processing.
1 parent 4df2e76 commit 14c1046

File tree

9 files changed

+497
-6
lines changed

9 files changed

+497
-6
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
<groupId>org.springframework.cloud</groupId>
3333
<artifactId>spring-cloud-stream</artifactId>
3434
</dependency>
35+
<dependency>
36+
<groupId>org.springframework.cloud</groupId>
37+
<artifactId>spring-cloud-stream-test-binder</artifactId>
38+
<scope>test</scope>
39+
</dependency>
3540
<dependency>
3641
<groupId>org.springframework.boot</groupId>
3742
<artifactId>spring-boot-autoconfigure</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2019-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka.config;
18+
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import org.springframework.cloud.function.context.config.MessageConverterHelper;
22+
import org.springframework.messaging.Message;
23+
24+
/**
25+
* @author Oleg Zhurakousky
26+
*/
27+
public class DefaultMessageConverterHelper implements MessageConverterHelper {
28+
29+
@Override
30+
public boolean shouldFailIfCantConvert(Message<?> message) {
31+
return false;
32+
}
33+
34+
public void postProcessBatchMessageOnFailure(Message<?> message, int index) {
35+
AtomicInteger deliveryAttempt = (AtomicInteger) message.getHeaders().get("deliveryAttempt");
36+
// if (message.getHeaders().containsKey("amqp_batchedHeaders") && deliveryAttempt != null && deliveryAttempt.get() == 1) {
37+
// ArrayList<?> list = (ArrayList<?>) message.getHeaders().get("amqp_batchedHeaders");
38+
// list.remove(index);
39+
// }
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright 2019-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka;
18+
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
21+
import java.util.LinkedHashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.function.Function;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
30+
import org.springframework.cloud.function.context.config.MessageConverterHelper;
31+
import org.springframework.cloud.function.json.JacksonMapper;
32+
import org.springframework.cloud.stream.binder.test.InputDestination;
33+
import org.springframework.cloud.stream.binder.test.OutputDestination;
34+
import org.springframework.cloud.stream.binder.test.TestChannelBinder;
35+
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
36+
import org.springframework.context.annotation.Bean;
37+
import org.springframework.context.annotation.Configuration;
38+
import org.springframework.integration.support.MessageBuilder;
39+
import org.springframework.messaging.Message;
40+
import org.springframework.messaging.MessageHandlingException;
41+
import org.springframework.messaging.converter.MessageConversionException;
42+
43+
import static org.assertj.core.api.Assertions.assertThat;
44+
45+
/**
46+
*
47+
*/
48+
public class FunctionBatchingConversionTests {
49+
50+
@SuppressWarnings("unchecked")
51+
// @Test
52+
void testBatchHeadersMatchingPayload() {
53+
TestChannelBinderConfiguration.applicationContextRunner(BatchFunctionConfiguration.class)
54+
.withPropertyValues("spring.cloud.stream.function.definition=func",
55+
"spring.cloud.stream.bindings.func-in-0.consumer.batch-mode=true",
56+
"spring.cloud.stream.rabbit.bindings.func-in-0.consumer.enable-batching=true")
57+
.run(context -> {
58+
InputDestination inputDestination = context.getBean(InputDestination.class);
59+
OutputDestination outputDestination = context.getBean(OutputDestination.class);
60+
61+
List<byte[]> payloads = List.of("hello".getBytes(StandardCharsets.UTF_8),
62+
"{\"name\":\"Ricky\"}".getBytes(StandardCharsets.UTF_8),
63+
"{\"name\":\"Julien\"}".getBytes(StandardCharsets.UTF_8),
64+
"{\"name\":\"Bubbles\"}".getBytes(StandardCharsets.UTF_8),
65+
"hello".getBytes(StandardCharsets.UTF_8));
66+
List<Map<String, String>> amqpBatchHeaders = new ArrayList<>();
67+
for (int i = 0; i < 5; i++) {
68+
Map<String, String> batchHeaders = new LinkedHashMap<>();
69+
batchHeaders.put("amqp_receivedDeliveryMode", "PERSISTENT");
70+
batchHeaders.put("index", String.valueOf(i));
71+
amqpBatchHeaders.add(batchHeaders);
72+
}
73+
74+
var message = MessageBuilder.withPayload(payloads)
75+
.setHeader("amqp_batchedHeaders", amqpBatchHeaders)
76+
.setHeader("deliveryAttempt", new AtomicInteger(1)).build();
77+
inputDestination.send(message);
78+
79+
Message<byte[]> resultMessage = outputDestination.receive();
80+
JacksonMapper mapper = context.getBean(JacksonMapper.class);
81+
List<?> resultPayloads = mapper.fromJson(resultMessage.getPayload(), List.class);
82+
assertThat(resultPayloads).hasSize(3);
83+
84+
List<Map<String, String>> amqpBatchedHeaders = (List<Map<String, String>>) resultMessage
85+
.getHeaders().get("amqp_batchedHeaders");
86+
assertThat(amqpBatchedHeaders).hasSize(resultPayloads.size());
87+
assertThat(amqpBatchedHeaders.get(0).get("index")).isEqualTo("1");
88+
assertThat(amqpBatchedHeaders.get(1).get("index")).isEqualTo("2");
89+
assertThat(amqpBatchedHeaders.get(2).get("index")).isEqualTo("3");
90+
91+
context.stop();
92+
});
93+
}
94+
95+
// @Test
96+
void testBatchHeadersForcingFatalFailureOnConversiionException() {
97+
TestChannelBinderConfiguration
98+
.applicationContextRunner(BatchFunctionConfigurationWithAdditionalConversionHelper.class)
99+
.withPropertyValues("spring.cloud.stream.function.definition=func",
100+
"spring.cloud.stream.bindings.func-in-0.consumer.batch-mode=true",
101+
"spring.cloud.stream.bindings.func-in-0.consumer.max-attempts=1",
102+
"spring.cloud.stream.rabbit.bindings.func-in-0.consumer.enable-batching=true")
103+
.run(context -> {
104+
InputDestination inputDestination = context.getBean(InputDestination.class);
105+
106+
List<byte[]> payloads = List.of("hello".getBytes(StandardCharsets.UTF_8),
107+
"{\"name\":\"Ricky\"}".getBytes(StandardCharsets.UTF_8),
108+
"{\"name\":\"Julien\"}".getBytes(StandardCharsets.UTF_8),
109+
"{\"name\":\"Bubbles\"}".getBytes(StandardCharsets.UTF_8),
110+
"hello".getBytes(StandardCharsets.UTF_8));
111+
List<Map<String, String>> amqpBatchHeaders = new ArrayList<>();
112+
for (int i = 0; i < 5; i++) {
113+
Map<String, String> batchHeaders = new LinkedHashMap<>();
114+
batchHeaders.put("amqp_receivedDeliveryMode", "PERSISTENT");
115+
batchHeaders.put("index", String.valueOf(i));
116+
amqpBatchHeaders.add(batchHeaders);
117+
}
118+
119+
var message = MessageBuilder.withPayload(payloads)
120+
.setHeader("amqp_batchedHeaders", amqpBatchHeaders)
121+
.setHeader("deliveryAttempt", new AtomicInteger(1)).build();
122+
inputDestination.send(message);
123+
TestChannelBinder binder = context.getBean(TestChannelBinder.class);
124+
assertThat(binder.getLastError().getPayload()).isInstanceOf(MessageHandlingException.class);
125+
MessageHandlingException exception = (MessageHandlingException) binder.getLastError().getPayload();
126+
assertThat(exception.getCause()).isInstanceOf(MessageConversionException.class);
127+
128+
context.stop();
129+
});
130+
}
131+
132+
@Configuration
133+
@EnableAutoConfiguration
134+
public static class BatchFunctionConfiguration {
135+
@Bean
136+
public Function<Message<List<Person>>, Message<List<Person>>> func() {
137+
return x -> {
138+
return x;
139+
};
140+
}
141+
}
142+
143+
@Configuration
144+
@EnableAutoConfiguration
145+
public static class BatchFunctionConfigurationWithAdditionalConversionHelper {
146+
147+
@Bean
148+
public MessageConverterHelper helper() {
149+
return new MessageConverterHelper() {
150+
public boolean shouldFailIfCantConvert(Message<?> message) {
151+
return true;
152+
}
153+
};
154+
}
155+
156+
@Bean
157+
public Function<Message<List<Person>>, Message<List<Person>>> func() {
158+
return x -> {
159+
return x;
160+
};
161+
}
162+
}
163+
164+
static class Person {
165+
166+
private String name;
167+
168+
public String getName() {
169+
return name;
170+
}
171+
172+
public void setName(String name) {
173+
this.name = name;
174+
}
175+
176+
public String toString() {
177+
return "name: " + name;
178+
}
179+
}
180+
}

binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@
3535
<artifactId>spring-boot-configuration-processor</artifactId>
3636
<optional>true</optional>
3737
</dependency>
38+
<dependency>
39+
<groupId>org.springframework.cloud</groupId>
40+
<artifactId>spring-cloud-stream-test-binder</artifactId>
41+
<scope>test</scope>
42+
</dependency>
3843
<dependency>
3944
<groupId>org.springframework.cloud</groupId>
4045
<artifactId>spring-cloud-stream</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2019-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.rabbit.config;
18+
19+
import java.util.ArrayList;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
import org.springframework.cloud.function.context.config.MessageConverterHelper;
23+
import org.springframework.messaging.Message;
24+
25+
/**
26+
* @author Oleg Zhurakousky
27+
*/
28+
public class DefaultMessageConverterHelper implements MessageConverterHelper {
29+
30+
@Override
31+
public boolean shouldFailIfCantConvert(Message<?> message) {
32+
return false;
33+
}
34+
35+
public void postProcessBatchMessageOnFailure(Message<?> message, int index) {
36+
AtomicInteger deliveryAttempt = (AtomicInteger) message.getHeaders().get("deliveryAttempt");
37+
if (message.getHeaders().containsKey("amqp_batchedHeaders") && deliveryAttempt != null && deliveryAttempt.get() == 1) {
38+
ArrayList<?> list = (ArrayList<?>) message.getHeaders().get("amqp_batchedHeaders");
39+
list.remove(index);
40+
}
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2019-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.rabbit.config;
18+
19+
import org.springframework.cloud.function.context.config.MessageConverterHelper;
20+
import org.springframework.context.annotation.Bean;
21+
import org.springframework.context.annotation.Configuration;
22+
23+
/**
24+
* @author Oleg Zhurakousky
25+
*/
26+
@Configuration(proxyBeanMethods = false)
27+
public class MessageConverterHelperConfiguration {
28+
29+
@Bean
30+
public MessageConverterHelper messageConverterHelper() {
31+
return new DefaultMessageConverterHelper();
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
org.springframework.cloud.stream.binder.rabbit.config.ExtendedBindingHandlerMappingsProviderConfiguration
2+
org.springframework.cloud.stream.binder.rabbit.config.MessageConverterHelperConfiguration

0 commit comments

Comments
 (0)