Skip to content

Commit 982d39d

Browse files
yawkatsdelamo
andauthored
Add support for WebSocketFrame parameters in OnMessage (#11673)
Fixes #11506 Co-authored-by: Sergio del Amo <[email protected]>
1 parent 19da2da commit 982d39d

File tree

2 files changed

+79
-25
lines changed

2 files changed

+79
-25
lines changed

http-netty/src/main/java/io/micronaut/http/netty/websocket/AbstractNettyWebSocketHandler.java

+27-21
Original file line numberDiff line numberDiff line change
@@ -319,30 +319,36 @@ protected void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame ms
319319
CloseReason.UNSUPPORTED_DATA
320320
);
321321
} else {
322-
ByteBuf msgContent = msg.content().retain();
323-
if (!msg.isFinalFragment()) {
324-
frameBuffer.updateAndGet((buffer) -> {
325-
if (buffer == null) {
326-
buffer = ctx.alloc().compositeBuffer();
327-
}
328-
buffer.addComponent(true, msgContent);
329-
return buffer;
330-
});
331-
return;
332-
}
322+
Argument<?> bodyArgument = this.getBodyArgument();
323+
Object data;
333324

334-
ByteBuf content;
335-
CompositeByteBuf buffer = frameBuffer.getAndSet(null);
336-
if (buffer == null) {
337-
content = msgContent;
325+
if (WebSocketFrame.class.isAssignableFrom(bodyArgument.getType())) {
326+
data = msg.retain();
338327
} else {
339-
buffer.addComponent(true, msgContent);
340-
content = buffer;
341-
}
328+
ByteBuf msgContent = msg.content().retain();
329+
if (!msg.isFinalFragment()) {
330+
frameBuffer.updateAndGet((buffer) -> {
331+
if (buffer == null) {
332+
buffer = ctx.alloc().compositeBuffer();
333+
}
334+
buffer.addComponent(true, msgContent);
335+
return buffer;
336+
});
337+
return;
338+
}
342339

343-
Argument<?> bodyArgument = this.getBodyArgument();
344-
Object data = conversionService.convert(content, ByteBuf.class, bodyArgument).orElse(null);
345-
content.release();
340+
ByteBuf content;
341+
CompositeByteBuf buffer = frameBuffer.getAndSet(null);
342+
if (buffer == null) {
343+
content = msgContent;
344+
} else {
345+
buffer.addComponent(true, msgContent);
346+
content = buffer;
347+
}
348+
349+
data = conversionService.convert(content, ByteBuf.class, bodyArgument).orElse(null);
350+
content.release();
351+
}
346352

347353
if (data == null) {
348354
MediaType mediaType;

http-server-netty/src/test/groovy/io/micronaut/http/server/netty/websocket/WebSocketSpec.groovy

+52-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package io.micronaut.http.server.netty.websocket
22

33
import io.micronaut.context.ApplicationContext
44
import io.micronaut.context.annotation.Requires
5-
import io.micronaut.core.annotation.NonNull
65
import io.micronaut.http.HttpRequest
76
import io.micronaut.http.HttpResponse
87
import io.micronaut.http.annotation.Filter
@@ -11,26 +10,29 @@ import io.micronaut.http.filter.HttpFilter
1110
import io.micronaut.http.server.netty.EmbeddedTestUtil
1211
import io.micronaut.http.server.netty.NettyHttpServer
1312
import io.micronaut.runtime.server.EmbeddedServer
13+
import io.micronaut.websocket.WebSocketClient
1414
import io.micronaut.websocket.WebSocketSession
15+
import io.micronaut.websocket.annotation.ClientWebSocket
1516
import io.micronaut.websocket.annotation.OnMessage
1617
import io.micronaut.websocket.annotation.ServerWebSocket
17-
import io.netty.channel.ChannelHandlerContext
18-
import io.netty.channel.SimpleChannelInboundHandler
1918
import io.netty.channel.embedded.EmbeddedChannel
2019
import io.netty.handler.codec.http.DefaultHttpHeaders
21-
import io.netty.handler.codec.http.FullHttpResponse
2220
import io.netty.handler.codec.http.HttpClientCodec
2321
import io.netty.handler.codec.http.HttpObjectAggregator
2422
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
2523
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory
24+
import io.netty.handler.codec.http.websocketx.WebSocketFrame
2625
import io.netty.handler.codec.http.websocketx.WebSocketVersion
2726
import jakarta.inject.Singleton
2827
import org.reactivestreams.Publisher
2928
import reactor.core.publisher.Flux
29+
import reactor.core.publisher.Mono
3030
import reactor.core.publisher.Sinks
3131
import spock.lang.Issue
3232
import spock.lang.Specification
3333

34+
import java.util.concurrent.CompletableFuture
35+
3436
class WebSocketSpec extends Specification {
3537
@Issue('https://github.com/micronaut-projects/micronaut-core/issues/7920')
3638
def 'race condition with channel close from http filter'() {
@@ -99,4 +101,50 @@ class WebSocketSpec extends Specification {
99101
return Flux.concat(delay.asMono(), Flux.defer(() -> chain.proceed(request)))
100102
}
101103
}
104+
105+
@Issue('https://github.com/micronaut-projects/micronaut-core/issues/11506')
106+
def 'netty WebSocketFrame'() {
107+
given:
108+
ApplicationContext ctx = ApplicationContext.run([
109+
'spec.name': 'WebSocketSpec2',
110+
])
111+
def embeddedServer = ctx.getBean(EmbeddedServer)
112+
embeddedServer.start()
113+
def client = ctx.createBean(WebSocketClient, embeddedServer.URI)
114+
115+
when:
116+
def sock = Mono.from(client.connect(ClientSocket, "/ServerSocket")).block()
117+
sock.send("foo")
118+
then:
119+
sock.reply.get() == "reply: foo"
120+
121+
cleanup:
122+
sock.close()
123+
client.close()
124+
embeddedServer.close()
125+
ctx.close()
126+
}
127+
128+
@ServerWebSocket('/ServerSocket')
129+
@Requires(property = 'spec.name', value = 'WebSocketSpec2')
130+
static class ServerSocket {
131+
@OnMessage
132+
def onMessage(WebSocketFrame message, WebSocketSession session) {
133+
def text = ((TextWebSocketFrame) message).text()
134+
message.release()
135+
return session.send('reply: ' + text)
136+
}
137+
}
138+
139+
@ClientWebSocket
140+
static abstract class ClientSocket implements AutoCloseable {
141+
def reply = new CompletableFuture<String>()
142+
143+
abstract void send(String message);
144+
145+
@OnMessage
146+
public void onMessage(String message) {
147+
reply.complete(message)
148+
}
149+
}
102150
}

0 commit comments

Comments
 (0)