Skip to content

Commit ceee505

Browse files
committed
spring-projectsGH-3103: Introduce CloudEvents transformers
Fixes spring-projects#3103 * Add an `io.cloudevents:cloudevents-api` optional dependency * Introduce a `HeaderMapper` and `Marshallers` in the `support.cloudevents` to marshal `CloudEvent` instances * Introduce a `ToCloudEventTransformer` to build a `CloudEvent` instance from a `Message` and optional marshaling logic if necessary. Such a transformer could be used as a general purpose CE protocol binder before sending a result message into the target protocol channel adapter
1 parent 97702ae commit ceee505

File tree

6 files changed

+488
-0
lines changed

6 files changed

+488
-0
lines changed

Diff for: build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ ext {
5151
assertjVersion = '3.15.0'
5252
assertkVersion = '0.22'
5353
awaitilityVersion = '4.0.2'
54+
cloudEventsVersion = '1.3.0'
5455
commonsDbcp2Version = '2.7.0'
5556
commonsIoVersion = '2.6'
5657
commonsNetVersion = '3.6'
@@ -419,6 +420,7 @@ project('spring-integration-core') {
419420
optionalApi "io.github.resilience4j:resilience4j-ratelimiter:$resilience4jVersion"
420421
optionalApi "org.apache.avro:avro:$avroVersion"
421422
optionalApi 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
423+
optionalApi "io.cloudevents:cloudevents-api:$cloudEventsVersion"
422424

423425
testImplementation ("org.aspectj:aspectjweaver:$aspectjVersion")
424426
testImplementation ('com.fasterxml.jackson.datatype:jackson-datatype-jsr310')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.cloudevents;
18+
19+
import java.util.AbstractMap;
20+
import java.util.Locale;
21+
import java.util.Map;
22+
import java.util.Optional;
23+
import java.util.stream.Collectors;
24+
25+
import org.springframework.messaging.MessageHeaders;
26+
import org.springframework.util.Assert;
27+
28+
import io.cloudevents.v1.ContextAttributes;
29+
30+
/**
31+
* A Cloud Event header mapper.
32+
*
33+
* @author Artem Bilan
34+
*
35+
* @since 5.3
36+
*/
37+
public class HeaderMapper {
38+
39+
/**
40+
* Cloud event headers prefix as a {@value HEADER_PREFIX}.
41+
*/
42+
public static final String HEADER_PREFIX = "ce_";
43+
44+
/**
45+
* Following the signature of {@link io.cloudevents.fun.FormatHeaderMapper}
46+
* @param attributes The map of attributes
47+
* @param extensions The map of extensions
48+
* @return The map of headers
49+
*/
50+
public static Map<String, String> map(Map<String, String> attributes, Map<String, String> extensions) {
51+
Assert.notNull(attributes, "'attributes' must noy be null");
52+
Assert.notNull(extensions, "'extensions' must noy be null");
53+
54+
Map<String, String> result =
55+
attributes.entrySet()
56+
.stream()
57+
.filter(attribute ->
58+
attribute.getValue() != null
59+
&& !ContextAttributes.datacontenttype.name().equals(attribute.getKey()))
60+
.map(header ->
61+
new AbstractMap.SimpleEntry<>(
62+
HEADER_PREFIX + header.getKey().toLowerCase(Locale.US),
63+
header.getValue()))
64+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
65+
66+
result.putAll(
67+
extensions.entrySet()
68+
.stream()
69+
.filter(extension -> extension.getValue() != null)
70+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
71+
);
72+
73+
Optional.ofNullable(attributes
74+
.get(ContextAttributes.datacontenttype.name()))
75+
.ifPresent((dataContentType) -> {
76+
result.put(MessageHeaders.CONTENT_TYPE, dataContentType);
77+
});
78+
79+
return result;
80+
}
81+
82+
private HeaderMapper() {
83+
}
84+
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.cloudevents;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import org.springframework.messaging.MessageHeaders;
23+
24+
import io.cloudevents.extensions.ExtensionFormat;
25+
import io.cloudevents.format.BinaryMarshaller;
26+
import io.cloudevents.format.StructuredMarshaller;
27+
import io.cloudevents.format.Wire;
28+
import io.cloudevents.format.builder.EventStep;
29+
import io.cloudevents.json.Json;
30+
import io.cloudevents.v1.Accessor;
31+
import io.cloudevents.v1.AttributesImpl;
32+
33+
/**
34+
* A Cloud Events general purpose marshallers factory.
35+
*
36+
* @author Artem Bilan
37+
*
38+
* @since 5.3
39+
*/
40+
public final class Marshallers {
41+
42+
private static final Map<String, String> NO_HEADERS = new HashMap<>();
43+
44+
/**
45+
* Builds a Binary Content Mode marshaller to marshal cloud events as JSON for
46+
* any Transport Binding.
47+
* @param <T> The data type
48+
* @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON
49+
* @see BinaryMarshaller
50+
*/
51+
public static <T> EventStep<AttributesImpl, T, byte[], String> binary() {
52+
return BinaryMarshaller.<AttributesImpl, T, byte[], String>builder()
53+
.map(AttributesImpl::marshal)
54+
.map(Accessor::extensionsOf)
55+
.map(ExtensionFormat::marshal)
56+
.map(HeaderMapper::map)
57+
.map(Json::binaryMarshal)
58+
.builder(Wire::new);
59+
}
60+
61+
/**
62+
* Builds a Structured Content Mode marshaller to marshal cloud event as JSON for
63+
* any Transport Binding.
64+
* @param <T> The data type
65+
* @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON
66+
* @see StructuredMarshaller
67+
*/
68+
public static <T> EventStep<AttributesImpl, T, byte[], String> structured() {
69+
return StructuredMarshaller.<AttributesImpl, T, byte[], String>
70+
builder()
71+
.mime(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json")
72+
.map((event) -> Json.binaryMarshal(event, NO_HEADERS))
73+
.skip();
74+
}
75+
76+
private Marshallers() {
77+
78+
}
79+
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides classes supporting for Cloud Events.
3+
*/
4+
package org.springframework.integration.support.cloudevents;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.transformer;
18+
19+
import java.net.URI;
20+
import java.time.ZonedDateTime;
21+
import java.util.UUID;
22+
23+
import org.springframework.expression.EvaluationContext;
24+
import org.springframework.expression.Expression;
25+
import org.springframework.integration.StaticMessageHeaderAccessor;
26+
import org.springframework.integration.expression.ExpressionUtils;
27+
import org.springframework.integration.expression.FunctionExpression;
28+
import org.springframework.integration.support.cloudevents.Marshallers;
29+
import org.springframework.lang.Nullable;
30+
import org.springframework.messaging.Message;
31+
import org.springframework.messaging.MessageHeaders;
32+
import org.springframework.util.Assert;
33+
import org.springframework.util.MimeType;
34+
35+
import io.cloudevents.CloudEvent;
36+
import io.cloudevents.extensions.ExtensionFormat;
37+
import io.cloudevents.format.Wire;
38+
import io.cloudevents.format.builder.EventStep;
39+
import io.cloudevents.v1.AttributesImpl;
40+
import io.cloudevents.v1.CloudEventBuilder;
41+
import io.cloudevents.v1.CloudEventImpl;
42+
43+
/**
44+
* An {@link AbstractTransformer} implementation to build a cloud event
45+
* from the request message.
46+
* <p>
47+
* This transformer may produce a message according a {@link ToCloudEventTransformer.Result} option.
48+
* By default it is a {@link ToCloudEventTransformer.Result#RAW}
49+
* with the meaning to produce a {@link io.cloudevents.CloudEvent}
50+
* instance as a reply message payload.
51+
* <p>
52+
* A {@link ToCloudEventTransformer.Result#BINARY} mode produces a marshalled into a {@code byte[]}
53+
* a built {@link io.cloudevents.CloudEvent} body and respective cloud event headers.
54+
* <p>
55+
* A {@link ToCloudEventTransformer.Result#STRUCTURED} mode produces a marshalled into a {@code byte[]}
56+
* a whole {@link io.cloudevents.CloudEvent} and respective content type header
57+
* with the {@code "application/cloudevents+json"} value.
58+
*
59+
* @author Artem Bilan
60+
*
61+
* @since 5.3
62+
*/
63+
public class ToCloudEventTransformer extends AbstractTransformer {
64+
65+
public enum Result {
66+
67+
RAW, BINARY, STRUCTURED
68+
69+
}
70+
71+
private final URI source;
72+
73+
@Nullable
74+
private final EventStep<AttributesImpl, Object, byte[], String> wireBuilder;
75+
76+
private Expression typeExpression =
77+
new FunctionExpression<Message<?>>((message) -> message.getPayload().getClass().getName());
78+
79+
@Nullable
80+
private Expression subjectExpression;
81+
82+
@Nullable
83+
private Expression dataSchemaExpression;
84+
85+
@Nullable
86+
private Expression extensionExpression;
87+
88+
private EvaluationContext evaluationContext;
89+
90+
public ToCloudEventTransformer(URI source) {
91+
this(source, Result.RAW);
92+
}
93+
94+
public ToCloudEventTransformer(URI source, Result resultMode) {
95+
Assert.notNull(source, "'source' must not be null");
96+
Assert.notNull(resultMode, "'resultMode' must not be null");
97+
this.source = source;
98+
switch (resultMode) {
99+
case BINARY:
100+
this.wireBuilder = Marshallers.binary();
101+
break;
102+
case STRUCTURED:
103+
this.wireBuilder = Marshallers.structured();
104+
break;
105+
default:
106+
this.wireBuilder = null;
107+
}
108+
}
109+
110+
public void setTypeExpression(Expression typeExpression) {
111+
Assert.notNull(source, "'typeExpression' must not be null");
112+
this.typeExpression = typeExpression;
113+
}
114+
115+
public void setSubjectExpression(@Nullable Expression subjectExpression) {
116+
this.subjectExpression = subjectExpression;
117+
}
118+
119+
public void setDataSchemaExpression(@Nullable Expression dataSchemaExpression) {
120+
this.dataSchemaExpression = dataSchemaExpression;
121+
}
122+
123+
public void setExtensionExpression(@Nullable Expression extensionExpression) {
124+
this.extensionExpression = extensionExpression;
125+
}
126+
127+
@Override
128+
protected void onInit() {
129+
super.onInit();
130+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
131+
}
132+
133+
@Override
134+
protected Object doTransform(Message<?> message) {
135+
CloudEventImpl<Object> cloudEvent = buildCloudEvent(message);
136+
137+
if (this.wireBuilder != null) {
138+
Wire<byte[], String, String> wire =
139+
this.wireBuilder.withEvent(() -> cloudEvent)
140+
.marshal();
141+
142+
return getMessageBuilderFactory()
143+
.withPayload(wire.getPayload().orElse(new byte[0]))
144+
.copyHeaders(wire.getHeaders())
145+
.copyHeadersIfAbsent(message.getHeaders())
146+
.build();
147+
}
148+
else {
149+
return cloudEvent;
150+
}
151+
}
152+
153+
@SuppressWarnings("unchecked")
154+
private CloudEventImpl<Object> buildCloudEvent(Message<?> message) {
155+
MessageHeaders headers = message.getHeaders();
156+
Object payload = message.getPayload();
157+
158+
CloudEventBuilder<Object> cloudEventBuilder =
159+
payload instanceof CloudEvent
160+
? CloudEventBuilder.builder((CloudEvent<AttributesImpl, Object>) payload)
161+
: CloudEventBuilder.builder();
162+
163+
cloudEventBuilder.withId(headers.getId() != null
164+
? headers.getId().toString()
165+
: UUID.randomUUID().toString())
166+
.withTime(ZonedDateTime.now())
167+
.withSource(this.source)
168+
.withType(this.typeExpression.getValue(this.evaluationContext, message, String.class));
169+
170+
if (!(payload instanceof CloudEvent)) {
171+
if (payload instanceof byte[]) {
172+
cloudEventBuilder.withDataBase64((byte[]) payload);
173+
}
174+
else {
175+
cloudEventBuilder.withData(payload);
176+
}
177+
}
178+
179+
MimeType contentType = StaticMessageHeaderAccessor.getContentType(message);
180+
181+
if (contentType != null) {
182+
cloudEventBuilder.withDataContentType(contentType.toString());
183+
}
184+
185+
if (this.subjectExpression != null) {
186+
cloudEventBuilder.withSubject(
187+
this.subjectExpression.getValue(this.evaluationContext, message, String.class));
188+
}
189+
190+
if (this.dataSchemaExpression != null) {
191+
cloudEventBuilder.withDataschema(
192+
this.dataSchemaExpression.getValue(this.evaluationContext, message, URI.class));
193+
}
194+
195+
if (this.extensionExpression != null) {
196+
cloudEventBuilder.withExtension(
197+
this.extensionExpression.getValue(this.evaluationContext, message, ExtensionFormat.class));
198+
}
199+
200+
return cloudEventBuilder.build();
201+
}
202+
203+
}

0 commit comments

Comments
 (0)