Skip to content
This repository was archived by the owner on Nov 20, 2024. It is now read-only.

Commit e60db26

Browse files
committed
GH-331 Initial support for binder deployer
Resolves #331
1 parent ff632a4 commit e60db26

File tree

6 files changed

+88
-27
lines changed

6 files changed

+88
-27
lines changed

spring-cloud-stream-binder-rabbit-deployer/pom.xml

+6-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<parent>
1111
<groupId>org.springframework.cloud</groupId>
1212
<artifactId>spring-cloud-stream-binder-rabbit-parent</artifactId>
13-
<version>3.1.2-SNAPSHOT</version>
13+
<version>3.2.0-SNAPSHOT</version>
1414
</parent>
1515

1616
<dependencies>
@@ -23,6 +23,11 @@
2323
<artifactId>spring-cloud-function-deployer</artifactId>
2424
<version>${project.version}</version>
2525
</dependency>
26+
<dependency>
27+
<groupId>org.springframework.cloud</groupId>
28+
<artifactId>spring-cloud-function-rsocket</artifactId>
29+
<version>${project.version}</version>
30+
</dependency>
2631
<dependency>
2732
<groupId>org.springframework.boot</groupId>
2833
<artifactId>spring-boot-configuration-processor</artifactId>

spring-cloud-stream-binder-rabbit-deployer/src/main/java/org/springframework/cloud/stream/binder/rabbit/deployer/RabbitDeployer.java

+39-15
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,19 @@
1616

1717
package org.springframework.cloud.stream.binder.rabbit.deployer;
1818

19+
import java.util.function.Consumer;
1920
import java.util.function.Function;
2021

2122
import org.springframework.boot.SpringApplication;
2223
import org.springframework.boot.autoconfigure.SpringBootApplication;
24+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
25+
import org.springframework.cloud.function.context.FunctionProperties;
26+
import org.springframework.cloud.stream.function.StreamBridge;
2327
import org.springframework.context.annotation.Bean;
28+
import org.springframework.core.env.Environment;
29+
import org.springframework.messaging.Message;
30+
import org.springframework.messaging.rsocket.RSocketRequester;
31+
import org.springframework.messaging.support.MessageBuilder;
2432

2533

2634
/**
@@ -32,24 +40,42 @@
3240
@SpringBootApplication
3341
public class RabbitDeployer {
3442

43+
/*
44+
* SpringApplication.run(RabbitDeployer.class,
45+
"--spring.cloud.function.definition=reverseFunction",
46+
"--spring.cloud.function.location=/bootjar-1.0.0.RELEASE-exec.jar",
47+
"--spring.cloud.function.function-class=function.example.ReverseFunction"
48+
);
49+
*/
3550
public static void main(String[] args) {
36-
SpringApplication.run(RabbitDeployer.class,
37-
//target/it/bootjar/target/bootjar-1.0.0.RELEASE-exec.jar
38-
"--spring.cloud.function.definition=reverseFunction;echo",
39-
"--spring.cloud.function.location=/Users/ozhurakousky/dev/repo/spring-cloud-function/spring-cloud-function-deployer/target/it/bootjar/target/bootjar-1.0.0.RELEASE-exec.jar",
40-
"--spring.cloud.function.function-class=function.example.ReverseFunction");
51+
SpringApplication.run(RabbitDeployer.class);
4152
}
4253

43-
4454
@Bean
45-
public Function<String, String> echo() {
46-
// java -jar timesource-kafka.jar
47-
48-
// java -jar rabbit-bundle.jar -Dspring.cloud.function.location=.....jar
49-
// java -jar rabbit-kafka-bundle.jar -Dspring.cloud.function.location=.....jar
55+
@ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true)
56+
public Function<Message<byte[]>, Message<byte[]>> gateway(StreamBridge bridge) {
57+
return message -> {
58+
String destinationName = (String) message.getHeaders().get("target_destination");
59+
bridge.send(destinationName, message);
60+
return MessageBuilder.withPayload("Successfully sent to reverseFunction-in-0".getBytes()).build();
61+
};
62+
}
5063

51-
// java -jar rabbit-rsocket-bundle.jar = GATEWAY
52-
return v -> v;
64+
/*
65+
* Just like any other stream bean. This one will subscribe to broker destination (using regular stream mechanisms)
66+
* and using some configuration provided by the user will propagate message to remote (rsocket) subscriber
67+
*/
68+
@Bean
69+
@ConditionalOnProperty(name = FunctionProperties.PREFIX + ".rsocket.enabled", matchIfMissing = true)
70+
public Consumer<Message<byte[]>> delegatingConsumer(RSocketRequester.Builder rsocketRequesterBuilder, Environment environment) {
71+
String host = environment.getProperty("spring.cloud.function.rsocket.subscriber.host");
72+
String port = environment.getProperty("spring.cloud.function.rsocket.subscriber.port");
73+
return message -> {
74+
// rsocketRequesterBuilder.tcp("host", Integer.valueOf(port))
75+
// .route("pojoToString")
76+
// .data(message)
77+
// .retrieveMono(String.class);
78+
};
5379
}
5480

5581
// Step-1 - rabbit-bundle.jar(time) | rabbit-bundle.jar(log) - Step One - local
@@ -59,6 +85,4 @@ public Function<String, String> echo() {
5985

6086
//http | rabbit-rsocket-bundle.jar(producer) | python | rabbit-rsocket-bundle.jar(consumer) | rabbit-bundle.jar(log)
6187

62-
//http => pyjon
63-
6488
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2021-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.rabbit.deployer;
18+
19+
import org.springframework.boot.SpringApplication;
20+
import org.springframework.boot.env.EnvironmentPostProcessor;
21+
import org.springframework.core.env.ConfigurableEnvironment;
22+
23+
/**
24+
*
25+
* @author Oleg Zhurakousky
26+
*
27+
* @since 3.2
28+
*
29+
*/
30+
class RabbitDeployerEnvironmentPostProcessor implements EnvironmentPostProcessor {
31+
32+
@Override
33+
public void postProcessEnvironment(ConfigurableEnvironment environment,
34+
SpringApplication application) {
35+
if (!environment.containsProperty("spring.cloud.function.rsocket.enabled")) {
36+
environment.getSystemProperties().putIfAbsent("spring.cloud.function.rsocket.enabled", false);
37+
}
38+
39+
}
40+
41+
}

spring-cloud-stream-binder-rabbit-deployer/src/main/resources/META-INF/spring.binders

-2
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.stream.binder.rabbit.config.ExtendedBindingHandlerMappingsProviderConfiguration
1+
org.springframework.boot.env.EnvironmentPostProcessor:\
2+
org.springframework.cloud.stream.binder.rabbit.deployer.RabbitDeployerEnvironmentPostProcessor

spring-cloud-stream-binder-rabbit-deployer/src/test/resources/log4j.properties

-8
This file was deleted.

0 commit comments

Comments
 (0)