1
1
/*
2
- * Copyright 2002-2023 the original author or authors.
2
+ * Copyright 2002-2024 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
@@ -68,12 +68,13 @@ final class WebSocketGraphQlTransport implements GraphQlTransport {
68
68
69
69
private final Mono <GraphQlSession > graphQlSessionMono ;
70
70
71
- private final long keepalive ;
71
+ @ Nullable
72
+ private final Duration keepAlive ;
72
73
73
74
74
75
WebSocketGraphQlTransport (
75
76
URI url , @ Nullable HttpHeaders headers , WebSocketClient client , CodecConfigurer codecConfigurer ,
76
- WebSocketGraphQlClientInterceptor interceptor , long keepalive ) {
77
+ WebSocketGraphQlClientInterceptor interceptor , @ Nullable Duration keepAlive ) {
77
78
78
79
Assert .notNull (url , "URI is required" );
79
80
Assert .notNull (client , "WebSocketClient is required" );
@@ -83,9 +84,9 @@ final class WebSocketGraphQlTransport implements GraphQlTransport {
83
84
this .url = url ;
84
85
this .headers .putAll ((headers != null ) ? headers : HttpHeaders .EMPTY );
85
86
this .webSocketClient = client ;
86
- this .keepalive = keepalive ;
87
+ this .keepAlive = keepAlive ;
87
88
88
- this .graphQlSessionHandler = new GraphQlSessionHandler (codecConfigurer , interceptor , keepalive );
89
+ this .graphQlSessionHandler = new GraphQlSessionHandler (codecConfigurer , interceptor , keepAlive );
89
90
90
91
this .graphQlSessionMono = initGraphQlSession (this .url , this .headers , client , this .graphQlSessionHandler )
91
92
.cacheInvalidateWhen (GraphQlSession ::notifyWhenClosed );
@@ -166,8 +167,9 @@ public Flux<GraphQlResponse> executeSubscription(GraphQlRequest request) {
166
167
return this .graphQlSessionMono .flatMapMany ((session ) -> session .executeSubscription (request ));
167
168
}
168
169
169
- public long getKeepAlive () {
170
- return keepalive ;
170
+ @ Nullable
171
+ Duration getKeepAlive () {
172
+ return this .keepAlive ;
171
173
}
172
174
173
175
@@ -191,15 +193,18 @@ private static class GraphQlSessionHandler implements WebSocketHandler {
191
193
192
194
private final AtomicBoolean stopped = new AtomicBoolean ();
193
195
194
- private final long keepalive ;
196
+ @ Nullable
197
+ private final Duration keepAlive ;
198
+
195
199
200
+ GraphQlSessionHandler (
201
+ CodecConfigurer codecConfigurer , WebSocketGraphQlClientInterceptor interceptor ,
202
+ @ Nullable Duration keepAlive ) {
196
203
197
- GraphQlSessionHandler (CodecConfigurer codecConfigurer , WebSocketGraphQlClientInterceptor interceptor ,
198
- long keepalive ) {
199
204
this .codecDelegate = new CodecDelegate (codecConfigurer );
200
205
this .interceptor = interceptor ;
201
206
this .graphQlSessionSink = Sinks .unsafe ().one ();
202
- this .keepalive = keepalive ;
207
+ this .keepAlive = keepAlive ;
203
208
}
204
209
205
210
@@ -257,7 +262,7 @@ public Mono<Void> handle(WebSocketSession session) {
257
262
session .send (connectionInitMono .concatWith (graphQlSession .getRequestFlux ())
258
263
.map ((message ) -> this .codecDelegate .encode (session , message )));
259
264
260
- Flux <Void > receiveCompletion = session .receive ()
265
+ Mono <Void > receiveCompletion = session .receive ()
261
266
.flatMap ((webSocketMessage ) -> {
262
267
if (sessionNotInitialized ()) {
263
268
try {
@@ -303,20 +308,22 @@ public Mono<Void> handle(WebSocketSession session) {
303
308
}
304
309
}
305
310
return Mono .empty ();
306
- });
307
-
308
- if (keepalive > 0 ) {
309
- Duration keepAliveDuration = Duration .ofSeconds (keepalive );
310
- receiveCompletion = receiveCompletion
311
- .mergeWith (Flux .interval (keepAliveDuration , keepAliveDuration )
312
- .flatMap (i -> {
313
- graphQlSession .sendPing (null );
314
- return Mono .empty ();
315
- })
316
- );
311
+ })
312
+ .mergeWith ((this .keepAlive != null ) ?
313
+ Flux .interval (this .keepAlive , this .keepAlive )
314
+ .filter ((aLong ) -> graphQlSession .checkSentOrReceivedMessagesAndClear ())
315
+ .doOnNext ((aLong ) -> graphQlSession .sendPing ())
316
+ .then () :
317
+ Flux .empty ())
318
+ .then ();
319
+
320
+ if (this .keepAlive != null ) {
321
+ Flux .interval (this .keepAlive , this .keepAlive )
322
+ .filter ((aLong ) -> graphQlSession .checkSentOrReceivedMessagesAndClear ())
323
+ .doOnNext ((aLong ) -> graphQlSession .sendPing ())
324
+ .subscribe ();
317
325
}
318
326
319
-
320
327
return Mono .zip (sendCompletion , receiveCompletion .then ()).then ();
321
328
}
322
329
@@ -413,6 +420,8 @@ private static class GraphQlSession {
413
420
414
421
private final Map <String , RequestState > requestStateMap = new ConcurrentHashMap <>();
415
422
423
+ private boolean hasReceivedMessages ;
424
+
416
425
417
426
GraphQlSession (WebSocketSession webSocketSession ) {
418
427
this .connection = DisposableConnection .from (webSocketSession );
@@ -483,11 +492,16 @@ void sendPong(@Nullable Map<String, Object> payload) {
483
492
this .requestSink .sendRequest (message );
484
493
}
485
494
486
- public void sendPing (@ Nullable Map < String , Object > payload ) {
487
- GraphQlWebSocketMessage message = GraphQlWebSocketMessage .ping (payload );
495
+ void sendPing () {
496
+ GraphQlWebSocketMessage message = GraphQlWebSocketMessage .ping (null );
488
497
this .requestSink .sendRequest (message );
489
498
}
490
499
500
+ boolean checkSentOrReceivedMessagesAndClear () {
501
+ boolean received = this .hasReceivedMessages ;
502
+ this .hasReceivedMessages = false ;
503
+ return (this .requestSink .checkSentMessagesAndClear () || received );
504
+ }
491
505
492
506
// Inbound messages
493
507
@@ -504,6 +518,8 @@ void handleNext(GraphQlWebSocketMessage message) {
504
518
return ;
505
519
}
506
520
521
+ this .hasReceivedMessages = true ;
522
+
507
523
if (requestState instanceof SingleResponseRequestState ) {
508
524
this .requestStateMap .remove (id );
509
525
}
@@ -631,6 +647,8 @@ private static final class RequestSink {
631
647
@ Nullable
632
648
private FluxSink <GraphQlWebSocketMessage > requestSink ;
633
649
650
+ private boolean hasSentMessages ;
651
+
634
652
private final Flux <GraphQlWebSocketMessage > requestFlux = Flux .create ((sink ) -> {
635
653
Assert .state (this .requestSink == null , "Expected single subscriber only for outbound messages" );
636
654
this .requestSink = sink ;
@@ -642,9 +660,16 @@ Flux<GraphQlWebSocketMessage> getRequestFlux() {
642
660
643
661
void sendRequest (GraphQlWebSocketMessage message ) {
644
662
Assert .state (this .requestSink != null , "Unexpected request before Flux is subscribed to" );
663
+ this .hasSentMessages = true ;
645
664
this .requestSink .next (message );
646
665
}
647
666
667
+ boolean checkSentMessagesAndClear () {
668
+ boolean result = this .hasSentMessages ;
669
+ this .hasSentMessages = false ;
670
+ return result ;
671
+ }
672
+
648
673
}
649
674
650
675
0 commit comments