Skip to content

Commit b33944b

Browse files
committed
Add RSocket server support with Spring Messaging
This commit adds support for RSocket server applications. The auto-configuration will either add RSocket support to an existing Reactor Netty server in a WebFlux application (as a WebSocket endpoint), or bootstrap a brand new RSocket server instance. Spring Boot will also auto-configure the Spring Messaging infrastructure that supports Controller beans with `@MessageMapping` annotated methods. Fixes gh-16021
1 parent 5e58f4a commit b33944b

28 files changed

+1776
-6
lines changed

spring-boot-project/spring-boot-autoconfigure/pom.xml

+15-5
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,21 @@
112112
<artifactId>reactor-netty</artifactId>
113113
<optional>true</optional>
114114
</dependency>
115+
<dependency>
116+
<groupId>io.rsocket</groupId>
117+
<artifactId>rsocket-core</artifactId>
118+
<optional>true</optional>
119+
</dependency>
120+
<dependency>
121+
<groupId>io.rsocket</groupId>
122+
<artifactId>rsocket-transport-netty</artifactId>
123+
<optional>true</optional>
124+
</dependency>
125+
<dependency>
126+
<groupId>io.searchbox</groupId>
127+
<artifactId>jest</artifactId>
128+
<optional>true</optional>
129+
</dependency>
115130
<dependency>
116131
<groupId>jakarta.json.bind</groupId>
117132
<artifactId>jakarta.json.bind-api</artifactId>
@@ -127,11 +142,6 @@
127142
<artifactId>money-api</artifactId>
128143
<optional>true</optional>
129144
</dependency>
130-
<dependency>
131-
<groupId>io.searchbox</groupId>
132-
<artifactId>jest</artifactId>
133-
<optional>true</optional>
134-
</dependency>
135145
<dependency>
136146
<groupId>org.apache.kafka</groupId>
137147
<artifactId>kafka-streams</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2012-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.rsocket;
18+
19+
import io.rsocket.RSocketFactory;
20+
import io.rsocket.transport.netty.server.TcpServerTransport;
21+
22+
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
23+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
24+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
25+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
26+
import org.springframework.context.annotation.Bean;
27+
import org.springframework.context.annotation.Configuration;
28+
import org.springframework.messaging.rsocket.MessageHandlerAcceptor;
29+
import org.springframework.messaging.rsocket.RSocketRequester;
30+
import org.springframework.messaging.rsocket.RSocketStrategies;
31+
32+
/**
33+
* {@link EnableAutoConfiguration} for Spring RSocket support in Spring Messaging.
34+
*
35+
* @author Brian Clozel
36+
* @since 2.2.0
37+
*/
38+
@Configuration(proxyBeanMethods = false)
39+
@ConditionalOnClass({ RSocketRequester.class, RSocketFactory.class,
40+
TcpServerTransport.class })
41+
@AutoConfigureAfter(RSocketStrategiesAutoConfiguration.class)
42+
public class RSocketMessagingAutoConfiguration {
43+
44+
@Bean
45+
@ConditionalOnMissingBean
46+
public MessageHandlerAcceptor messageHandlerAcceptor(
47+
RSocketStrategies rSocketStrategies) {
48+
MessageHandlerAcceptor acceptor = new MessageHandlerAcceptor();
49+
acceptor.setRSocketStrategies(rSocketStrategies);
50+
return acceptor;
51+
}
52+
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2012-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.rsocket;
18+
19+
import io.rsocket.RSocketFactory;
20+
import io.rsocket.transport.ServerTransport;
21+
import io.rsocket.transport.netty.server.WebsocketRouteTransport;
22+
import reactor.netty.http.server.HttpServer;
23+
24+
import org.springframework.boot.web.embedded.netty.NettyServerCustomizer;
25+
import org.springframework.messaging.rsocket.MessageHandlerAcceptor;
26+
27+
/**
28+
* {@libnk NettyServerCustomizer} that configures an RSocket Websocket endpoint.
29+
*
30+
* @author Brian Clozel
31+
*/
32+
class RSocketNettyServerCustomizer implements NettyServerCustomizer {
33+
34+
private final String mappingPath;
35+
36+
private final MessageHandlerAcceptor messageHandlerAcceptor;
37+
38+
RSocketNettyServerCustomizer(String mappingPath,
39+
MessageHandlerAcceptor messageHandlerAcceptor) {
40+
this.mappingPath = mappingPath;
41+
this.messageHandlerAcceptor = messageHandlerAcceptor;
42+
}
43+
44+
@Override
45+
public HttpServer apply(HttpServer httpServer) {
46+
final ServerTransport.ConnectionAcceptor acceptor = RSocketFactory.receive()
47+
.acceptor(this.messageHandlerAcceptor).toConnectionAcceptor();
48+
return httpServer.route((routes) -> routes.ws(this.mappingPath,
49+
WebsocketRouteTransport.newHandler(acceptor)));
50+
}
51+
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2012-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.rsocket;
18+
19+
import java.net.InetAddress;
20+
21+
import org.springframework.boot.context.properties.ConfigurationProperties;
22+
23+
/**
24+
* {@link ConfigurationProperties properties} for RSocket support.
25+
*
26+
* @author Brian Clozel
27+
* @since 2.2.0
28+
*/
29+
@ConfigurationProperties("spring.rsocket")
30+
public class RSocketProperties {
31+
32+
private Server server = new Server();
33+
34+
public Server getServer() {
35+
return this.server;
36+
}
37+
38+
static class Server {
39+
40+
/**
41+
* Server port.
42+
*/
43+
private Integer port;
44+
45+
/**
46+
* Network address to which the server should bind.
47+
*/
48+
private InetAddress address;
49+
50+
/**
51+
* RSocket transport protocol.
52+
*/
53+
private Transport transport = Transport.TCP;
54+
55+
/**
56+
* Path under which RSocket handles requests (only works with websocket
57+
* transport).
58+
*/
59+
private String mappingPath;
60+
61+
public Integer getPort() {
62+
return this.port;
63+
}
64+
65+
public void setPort(Integer port) {
66+
this.port = port;
67+
}
68+
69+
public InetAddress getAddress() {
70+
return this.address;
71+
}
72+
73+
public void setAddress(InetAddress address) {
74+
this.address = address;
75+
}
76+
77+
public Transport getTransport() {
78+
return this.transport;
79+
}
80+
81+
public void setTransport(Transport transport) {
82+
this.transport = transport;
83+
}
84+
85+
public String getMappingPath() {
86+
return this.mappingPath;
87+
}
88+
89+
public void setMappingPath(String mappingPath) {
90+
this.mappingPath = mappingPath;
91+
}
92+
93+
public enum Transport {
94+
95+
TCP, WEBSOCKET
96+
97+
}
98+
99+
}
100+
101+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright 2012-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.rsocket;
18+
19+
import java.util.stream.Collectors;
20+
21+
import io.netty.buffer.PooledByteBufAllocator;
22+
import io.rsocket.RSocketFactory;
23+
24+
import org.springframework.beans.factory.ObjectProvider;
25+
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
26+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
27+
import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
28+
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
29+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
30+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
31+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
32+
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
33+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
34+
import org.springframework.boot.context.properties.PropertyMapper;
35+
import org.springframework.boot.rsocket.netty.NettyRSocketBootstrap;
36+
import org.springframework.boot.rsocket.netty.NettyRSocketServerFactory;
37+
import org.springframework.boot.rsocket.server.RSocketServerFactory;
38+
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer;
39+
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
40+
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
41+
import org.springframework.context.annotation.Bean;
42+
import org.springframework.context.annotation.Conditional;
43+
import org.springframework.context.annotation.Configuration;
44+
import org.springframework.http.client.reactive.ReactorResourceFactory;
45+
import org.springframework.messaging.rsocket.MessageHandlerAcceptor;
46+
import org.springframework.messaging.rsocket.RSocketStrategies;
47+
48+
/**
49+
* {@link EnableAutoConfiguration Auto-configuration} for RSocket servers. In the case of
50+
* {@link org.springframework.boot.WebApplicationType#REACTIVE}, the RSocket server is
51+
* added as a WebSocket endpoint on the existing
52+
* {@link org.springframework.boot.web.embedded.netty.NettyWebServer}. If a specific
53+
* server port is configured, a new standalone RSocket server is created.
54+
*
55+
* @author Brian Clozel
56+
* @since 2.2.0
57+
*/
58+
@Configuration(proxyBeanMethods = false)
59+
@ConditionalOnClass({ RSocketFactory.class, RSocketStrategies.class,
60+
PooledByteBufAllocator.class })
61+
@ConditionalOnBean(MessageHandlerAcceptor.class)
62+
@AutoConfigureAfter(RSocketStrategiesAutoConfiguration.class)
63+
@EnableConfigurationProperties(RSocketProperties.class)
64+
public class RSocketServerAutoConfiguration {
65+
66+
@Conditional(OnRSocketWebServerCondition.class)
67+
@Configuration(proxyBeanMethods = false)
68+
static class WebFluxServerAutoConfiguration {
69+
70+
@Bean
71+
public WebServerFactoryCustomizer<NettyReactiveWebServerFactory> rSocketWebsocketCustomizer(
72+
RSocketProperties properties,
73+
MessageHandlerAcceptor messageHandlerAcceptor) {
74+
RSocketNettyServerCustomizer customizer = new RSocketNettyServerCustomizer(
75+
properties.getServer().getMappingPath(), messageHandlerAcceptor);
76+
return (factory) -> factory.addServerCustomizers(customizer);
77+
}
78+
79+
}
80+
81+
@ConditionalOnProperty(prefix = "spring.rsocket.server", name = "port")
82+
@Configuration(proxyBeanMethods = false)
83+
static class EmbeddedServerAutoConfiguration {
84+
85+
@Bean
86+
@ConditionalOnMissingBean
87+
public ReactorResourceFactory reactorServerResourceFactory() {
88+
return new ReactorResourceFactory();
89+
}
90+
91+
@Bean
92+
@ConditionalOnMissingBean
93+
public RSocketServerFactory rSocketServerFactory(RSocketProperties properties,
94+
ReactorResourceFactory resourceFactory,
95+
ObjectProvider<ServerRSocketFactoryCustomizer> customizers) {
96+
NettyRSocketServerFactory factory = new NettyRSocketServerFactory();
97+
factory.setResourceFactory(resourceFactory);
98+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
99+
map.from(properties.getServer().getAddress()).to(factory::setAddress);
100+
map.from(properties.getServer().getPort()).to(factory::setPort);
101+
factory.setServerCustomizers(
102+
customizers.orderedStream().collect(Collectors.toList()));
103+
return factory;
104+
}
105+
106+
@Bean
107+
public NettyRSocketBootstrap nettyRSocketBootstrap(
108+
RSocketServerFactory rSocketServerFactory,
109+
MessageHandlerAcceptor messageHandlerAcceptor) {
110+
return new NettyRSocketBootstrap(rSocketServerFactory,
111+
messageHandlerAcceptor);
112+
}
113+
114+
}
115+
116+
static class OnRSocketWebServerCondition extends AllNestedConditions {
117+
118+
OnRSocketWebServerCondition() {
119+
super(ConfigurationPhase.REGISTER_BEAN);
120+
}
121+
122+
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
123+
static class IsReactiveWebApplication {
124+
125+
}
126+
127+
@ConditionalOnProperty(prefix = "spring.rsocket.server", name = "port",
128+
matchIfMissing = true)
129+
static class HasNoPortConfigured {
130+
131+
}
132+
133+
@ConditionalOnProperty(prefix = "spring.rsocket.server", name = "mapping-path")
134+
static class HasMappingPathConfigured {
135+
136+
}
137+
138+
@ConditionalOnProperty(prefix = "spring.rsocket.server", name = "transport",
139+
havingValue = "websocket")
140+
static class HasWebsocketTransportConfigured {
141+
142+
}
143+
144+
}
145+
146+
}

0 commit comments

Comments
 (0)