Skip to content

Commit 0d672c1

Browse files
committed
SSE handlers support keep-alive
Closes gh-1048
1 parent 44ca9b0 commit 0d672c1

File tree

4 files changed

+31
-24
lines changed

4 files changed

+31
-24
lines changed

spring-graphql/src/main/java/org/springframework/graphql/server/webflux/GraphQlWebSocketHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ public GraphQlWebSocketHandler(
104104
* @param codecConfigurer codec configurer for JSON encoding and decoding
105105
* @param connectionInitTimeout how long to wait after the establishment of
106106
* the WebSocket for the {@code "connection_ini"} message from the client.
107-
* @param keepAliveDuration how frequently to send ping messages; if not
108-
* set then ping messages are not sent.
107+
* @param keepAliveDuration how frequently to send ping messages when no
108+
* other messages are sent
109109
* @since 1.3
110110
*/
111111
public GraphQlWebSocketHandler(

spring-graphql/src/main/java/org/springframework/graphql/server/webmvc/GraphQlSseHandler.java

+24-16
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import graphql.ExecutionResult;
2727
import graphql.GraphQLError;
2828
import graphql.GraphqlErrorBuilder;
29+
import org.apache.commons.logging.Log;
2930
import org.reactivestreams.Publisher;
3031
import reactor.core.publisher.BaseSubscriber;
3132
import reactor.core.publisher.Flux;
@@ -125,8 +126,8 @@ protected ServerResponse prepareResponse(
125126
});
126127

127128
return ((this.timeout != null) ?
128-
ServerResponse.sse(SseSubscriber.connect(resultFlux, this.keepAliveDuration), this.timeout) :
129-
ServerResponse.sse(SseSubscriber.connect(resultFlux, this.keepAliveDuration)));
129+
ServerResponse.sse(SseSubscriber.connect(resultFlux, this.logger, this.keepAliveDuration), this.timeout) :
130+
ServerResponse.sse(SseSubscriber.connect(resultFlux, this.logger, this.keepAliveDuration)));
130131
}
131132

132133

@@ -137,9 +138,12 @@ private static final class SseSubscriber extends BaseSubscriber<Map<String, Obje
137138

138139
private final ServerResponse.SseBuilder sseBuilder;
139140

140-
private SseSubscriber(ServerResponse.SseBuilder sseBuilder) {
141+
private final Log logger;
142+
143+
private SseSubscriber(ServerResponse.SseBuilder sseBuilder, Log logger) {
141144
this.sseBuilder = sseBuilder;
142145
this.sseBuilder.onTimeout(() -> cancelWithError(new AsyncRequestTimeoutException()));
146+
this.logger = logger;
143147
}
144148

145149
@Override
@@ -180,20 +184,24 @@ private void cancelWithError(Throwable ex) {
180184

181185
@Override
182186
protected void hookOnError(Throwable ex) {
183-
sendNext(exceptionToResultMap(ex));
187+
Map<String, Object> errorMap;
188+
if (ex instanceof SubscriptionPublisherException spe) {
189+
errorMap = spe.toMap();
190+
}
191+
else {
192+
if (this.logger.isErrorEnabled()) {
193+
this.logger.error("Unresolved " + ex.getClass().getSimpleName(), ex);
194+
}
195+
errorMap = GraphqlErrorBuilder.newError()
196+
.message("Subscription error")
197+
.errorType(org.springframework.graphql.execution.ErrorType.INTERNAL_ERROR)
198+
.build()
199+
.toSpecification();
200+
}
201+
sendNext(errorMap);
184202
sendComplete();
185203
}
186204

187-
private static Map<String, Object> exceptionToResultMap(Throwable ex) {
188-
return ((ex instanceof SubscriptionPublisherException spe) ?
189-
spe.toMap() :
190-
GraphqlErrorBuilder.newError()
191-
.message("Subscription error")
192-
.errorType(org.springframework.graphql.execution.ErrorType.INTERNAL_ERROR)
193-
.build()
194-
.toSpecification());
195-
}
196-
197205
private void sendComplete() {
198206
try {
199207
this.sseBuilder.event("complete").data("");
@@ -210,10 +218,10 @@ protected void hookOnComplete() {
210218
}
211219

212220
static Consumer<ServerResponse.SseBuilder> connect(
213-
Flux<Map<String, Object>> resultFlux, @Nullable Duration keepAliveDuration) {
221+
Flux<Map<String, Object>> resultFlux, Log logger, @Nullable Duration keepAliveDuration) {
214222

215223
return (sseBuilder) -> {
216-
SseSubscriber subscriber = new SseSubscriber(sseBuilder);
224+
SseSubscriber subscriber = new SseSubscriber(sseBuilder, logger);
217225
if (keepAliveDuration != null) {
218226
KeepAliveHandler handler = new KeepAliveHandler(keepAliveDuration);
219227
handler.compose(resultFlux).subscribe(subscriber);

spring-graphql/src/main/java/org/springframework/graphql/server/webmvc/GraphQlWebSocketHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ public GraphQlWebSocketHandler(
129129
* @param converter for JSON encoding and decoding
130130
* @param connectionInitTimeout how long to wait after the establishment of
131131
* the WebSocket for the {@code "connection_ini"} message from the client.
132-
* @param keepAliveDuration how frequently to send ping messages; if not
133-
* set then ping messages are not sent.
132+
* @param keepAliveDuration how frequently to send ping messages when no
133+
* other messages are sent
134134
* @since 1.3
135135
*/
136136
public GraphQlWebSocketHandler(

spring-graphql/src/test/java/org/springframework/graphql/server/webflux/GraphQlSseHandlerTests.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.springframework.mock.web.server.MockServerWebExchange;
4242
import org.springframework.web.reactive.function.server.ServerResponse;
4343
import org.springframework.web.reactive.result.view.ViewResolver;
44-
import org.springframework.web.server.ServerWebExchange;
4544

4645
import static org.assertj.core.api.Assertions.assertThat;
4746

@@ -181,9 +180,9 @@ private MockServerHttpResponse handleRequest(
181180

182181
MockServerRequest serverRequest = MockServerRequest.builder()
183182
.exchange(exchange)
184-
.uri(((ServerWebExchange) exchange).getRequest().getURI())
185-
.method(((ServerWebExchange) exchange).getRequest().getMethod())
186-
.headers(((ServerWebExchange) exchange).getRequest().getHeaders())
183+
.uri(exchange.getRequest().getURI())
184+
.method(exchange.getRequest().getMethod())
185+
.headers(exchange.getRequest().getHeaders())
187186
.body(Mono.just(body));
188187

189188
handler.handleRequest(serverRequest)

0 commit comments

Comments
 (0)