17
17
package org .springframework .graphql .server .webflux ;
18
18
19
19
20
+ import java .time .Duration ;
20
21
import java .util .Collections ;
21
22
import java .util .Map ;
22
23
32
33
import org .springframework .graphql .server .WebGraphQlResponse ;
33
34
import org .springframework .http .MediaType ;
34
35
import org .springframework .http .codec .ServerSentEvent ;
36
+ import org .springframework .lang .Nullable ;
35
37
import org .springframework .web .reactive .function .BodyInserters ;
36
38
import org .springframework .web .reactive .function .server .ServerRequest ;
37
39
import org .springframework .web .reactive .function .server .ServerResponse ;
@@ -51,9 +53,28 @@ public class GraphQlSseHandler extends AbstractGraphQlHttpHandler {
51
53
private static final Mono <ServerSentEvent <Map <String , Object >>> COMPLETE_EVENT = Mono .just (
52
54
ServerSentEvent .<Map <String , Object >>builder (Collections .emptyMap ()).event ("complete" ).build ());
53
55
56
+ @ Nullable
57
+ private final Duration timeout ;
54
58
59
+
60
+ /**
61
+ * Constructor with the handler to delegate to, and no timeout by default,
62
+ * which results in never timing out.
63
+ * @param graphQlHandler the handler to delegate to
64
+ */
55
65
public GraphQlSseHandler (WebGraphQlHandler graphQlHandler ) {
66
+ this (graphQlHandler , null );
67
+ }
68
+
69
+ /**
70
+ * Variant constructor with a timeout to use for SSE subscriptions.
71
+ * @param graphQlHandler the handler to delegate to
72
+ * @param timeout the timeout value to use or {@code null} to never time out
73
+ * @since 1.3.3
74
+ */
75
+ public GraphQlSseHandler (WebGraphQlHandler graphQlHandler , @ Nullable Duration timeout ) {
56
76
super (graphQlHandler , null );
77
+ this .timeout = timeout ;
57
78
}
58
79
59
80
@@ -83,10 +104,12 @@ protected Mono<ServerResponse> prepareResponse(ServerRequest request, WebGraphQl
83
104
Flux <ServerSentEvent <Map <String , Object >>> sseFlux =
84
105
resultFlux .map ((event ) -> ServerSentEvent .builder (event ).event ("next" ).build ());
85
106
86
- return ServerResponse .ok ()
107
+ Mono < ServerResponse > responseMono = ServerResponse .ok ()
87
108
.contentType (MediaType .TEXT_EVENT_STREAM )
88
109
.body (BodyInserters .fromServerSentEvents (sseFlux .concatWith (COMPLETE_EVENT )))
89
110
.onErrorResume (Throwable .class , (ex ) -> ServerResponse .badRequest ().build ());
111
+
112
+ return ((this .timeout != null ) ? responseMono .timeout (this .timeout ) : responseMono );
90
113
}
91
114
92
115
}
0 commit comments