Skip to content

Commit 505a33b

Browse files
committed
Add additional utility methods
1 parent baaf8d5 commit 505a33b

File tree

3 files changed

+101
-6
lines changed

3 files changed

+101
-6
lines changed

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/BinderHeaders.java

+6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ public final class BinderHeaders {
3939
IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, MessageHeaders.CONTENT_TYPE};
4040

4141
private static final String PREFIX = "scst_";
42+
43+
44+
/**
45+
* Name of the Message header identifying structure for batch Message headers.
46+
*/
47+
public static String BATCH_HEADERS = PREFIX + "batchHeaders";
4248

4349
/**
4450
* Indicates the name of the target destination the binder should use if they

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StandardBatchUtils.java

+64-4
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,81 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.HashMap;
21+
import java.util.Iterator;
2122
import java.util.List;
2223
import java.util.Map;
24+
import java.util.Map.Entry;
2325

26+
import org.springframework.cloud.stream.binder.BinderHeaders;
2427
import org.springframework.messaging.Message;
2528
import org.springframework.messaging.MessageHeaders;
2629
import org.springframework.messaging.support.MessageBuilder;
30+
import org.springframework.util.Assert;
2731

2832
/**
2933
* @author Oleg Zhurakousky
3034
* @since 4.2
3135
*/
32-
public class StandardBatchUtils {
36+
public final class StandardBatchUtils {
3337

34-
public static String BATCH_HEADERS = "scst_batchHeaders";
38+
private StandardBatchUtils() {
39+
40+
}
3541

42+
/**
43+
* Iterates over batch message structure returning {@link Iterable} of individual messages.
44+
*
45+
* @param batchMessage instance of batch {@link Message}
46+
* @return instance of {@link Iterable} representing individual Messages in a batch {@link Message} as {@link Entry}.
47+
*/
48+
public static Iterable<Entry<Object, Map<String, Object>>> iterate(Message<List<Object>> batchMessage) {
49+
return new Iterable<Map.Entry<Object,Map<String, Object>>>() {
50+
@Override
51+
public Iterator<Entry<Object, Map<String, Object>>> iterator() {
52+
return new Iterator<Entry<Object, Map<String, Object>>>() {
53+
int index = 0;
54+
@Override
55+
public Entry<Object, Map<String, Object>> next() {
56+
return getMessageByIndex(batchMessage, index++);
57+
}
58+
59+
@Override
60+
public boolean hasNext() {
61+
return index < batchMessage.getPayload().size();
62+
}
63+
};
64+
}
65+
};
66+
}
67+
68+
/**
69+
* Extracts individual {@link Message} by index from batch {@link Message}
70+
* @param batchMessage instance of batch {@link Message}
71+
* @param index index of individual {@link Message} in a batch
72+
* @return individual {@link Message} in a batch {@link Message}
73+
*/
74+
public static Entry<Object, Map<String, Object>> getMessageByIndex(Message<List<Object>> batchMessage, int index) {
75+
Assert.isTrue(index < batchMessage.getPayload().size(), "Index " + index + " is out of bounds as there are only "
76+
+ batchMessage.getPayload().size() + " messages in a batch.");
77+
return new Entry<Object, Map<String,Object>>() {
78+
79+
@Override
80+
public Map<String, Object> setValue(Map<String, Object> value) {
81+
throw new UnsupportedOperationException();
82+
}
83+
84+
@SuppressWarnings("unchecked")
85+
@Override
86+
public Map<String, Object> getValue() {
87+
return ((List<Map<String, Object>>) batchMessage.getHeaders().get(BinderHeaders.BATCH_HEADERS)).get(index);
88+
}
89+
90+
@Override
91+
public Object getKey() {
92+
return batchMessage.getPayload().get(index);
93+
}
94+
};
95+
}
3696

3797
public static class BatchMessageBuilder {
3898

@@ -48,13 +108,13 @@ public BatchMessageBuilder addMessage(Object payload, Map<String, Object> batchH
48108
return this;
49109
}
50110

51-
public BatchMessageBuilder addHeader(String key, Object value) {
111+
public BatchMessageBuilder addRootHeader(String key, Object value) {
52112
this.headers.put(key, value);
53113
return this;
54114
}
55115

56116
public Message<List<Object>> build() {
57-
this.headers.put(BATCH_HEADERS, this.batchHeaders);
117+
this.headers.put(BinderHeaders.BATCH_HEADERS, this.batchHeaders);
58118
return MessageBuilder.createMessage(payloads, new MessageHeaders(headers));
59119
}
60120
}

core/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/StandardBatchUtilsTests.java

+31-2
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.ArrayList;
2122
import java.util.Collections;
2223
import java.util.List;
2324
import java.util.Map;
25+
import java.util.Map.Entry;
2426

2527
import org.junit.jupiter.api.Test;
28+
import org.springframework.cloud.stream.binder.BinderHeaders;
2629
import org.springframework.cloud.stream.function.StandardBatchUtils.BatchMessageBuilder;
2730
import org.springframework.messaging.Message;
2831

32+
2933
/**
3034
*
3135
*/
@@ -36,7 +40,7 @@ public class StandardBatchUtilsTests {
3640
public void testBatchMessageBuilder() {
3741
BatchMessageBuilder builder = new BatchMessageBuilder();
3842
builder.addMessage("foo", Collections.singletonMap("fooKey", "fooValue"));
39-
builder.addHeader("a", "a");
43+
builder.addRootHeader("a", "a");
4044
builder.addMessage("bar", Collections.singletonMap("barKey", "barValue"));
4145
builder.addMessage("baz", Collections.singletonMap("bazKey", "bazValue"));
4246

@@ -45,7 +49,7 @@ public void testBatchMessageBuilder() {
4549
List<Object> payloads = batchMessage.getPayload();
4650
assertThat(payloads.size()).isEqualTo(3);
4751

48-
List<Map<String, Object>> batchHeaders = (List<Map<String, Object>>) batchMessage.getHeaders().get(StandardBatchUtils.BATCH_HEADERS);
52+
List<Map<String, Object>> batchHeaders = (List<Map<String, Object>>) batchMessage.getHeaders().get(BinderHeaders.BATCH_HEADERS);
4953
assertThat(batchHeaders.size()).isEqualTo(3);
5054

5155
assertThat(payloads.get(0)).isEqualTo("foo");
@@ -56,4 +60,29 @@ public void testBatchMessageBuilder() {
5660

5761
assertThat(batchMessage.getHeaders().get("a")).isEqualTo("a");
5862
}
63+
64+
@Test
65+
public void testIterator() {
66+
BatchMessageBuilder builder = new BatchMessageBuilder();
67+
builder.addMessage("foo", Collections.singletonMap("fooKey", "fooValue"));
68+
builder.addRootHeader("a", "a");
69+
builder.addMessage("bar", Collections.singletonMap("barKey", "barValue"));
70+
builder.addMessage("baz", Collections.singletonMap("bazKey", "bazValue"));
71+
72+
Message<List<Object>> batchMessage = builder.build();
73+
74+
List<Entry<Object, Map<String, Object>>> entries = new ArrayList<>();
75+
StandardBatchUtils.iterate(batchMessage).forEach(entry -> {
76+
entries.add(entry);
77+
});
78+
assertThat(entries.size()).isEqualTo(3);
79+
assertThat(entries.get(0).getKey()).isEqualTo("foo");
80+
assertThat(entries.get(0).getValue().get("fooKey")).isEqualTo("fooValue");
81+
82+
assertThat(entries.get(1).getKey()).isEqualTo("bar");
83+
assertThat(entries.get(1).getValue().get("barKey")).isEqualTo("barValue");
84+
85+
assertThat(entries.get(2).getKey()).isEqualTo("baz");
86+
assertThat(entries.get(2).getValue().get("bazKey")).isEqualTo("bazValue");
87+
}
5988
}

0 commit comments

Comments
 (0)