Skip to content

Commit 6e55e78

Browse files
committed
WebFlux support for SSE Fragment stream
See gh-33194
1 parent aa6b47b commit 6e55e78

File tree

2 files changed

+118
-1
lines changed

2 files changed

+118
-1
lines changed

spring-webflux/src/main/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandler.java

+88-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.web.reactive.result.view;
1818

19+
import java.nio.charset.Charset;
20+
import java.nio.charset.StandardCharsets;
1921
import java.util.ArrayList;
2022
import java.util.Collection;
2123
import java.util.Collections;
@@ -38,9 +40,11 @@
3840
import org.springframework.core.ResolvableType;
3941
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
4042
import org.springframework.core.io.buffer.DataBuffer;
43+
import org.springframework.core.io.buffer.DataBufferFactory;
4144
import org.springframework.http.HttpHeaders;
4245
import org.springframework.http.HttpStatusCode;
4346
import org.springframework.http.MediaType;
47+
import org.springframework.http.server.reactive.ServerHttpRequest;
4448
import org.springframework.http.server.reactive.ServerHttpResponse;
4549
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
4650
import org.springframework.lang.Nullable;
@@ -96,6 +100,8 @@ public class ViewResolutionResultHandler extends HandlerResultHandlerSupport imp
96100

97101
private final List<View> defaultViews = new ArrayList<>(4);
98102

103+
private final List<FragmentFormatter> fragmentFormatters = List.of(new SseFragmentFormatter());
104+
99105

100106
/**
101107
* Basic constructor with a default {@link ReactiveAdapterRegistry}.
@@ -337,8 +343,22 @@ private Mono<Flux<DataBuffer>> renderFragment(
337343
Mono.just(List.of(fragment.view())) :
338344
resolveViews(fragment.viewName() != null ? fragment.viewName() : getDefaultViewName(exchange), locale));
339345

346+
FragmentFormatter fragmentFormatter = getFragmentFormatter(exchange);
347+
340348
return selectedViews.flatMap(views -> render(views, fragment.model(), bindingContext, mutatedExchange))
341-
.then(Mono.fromSupplier(response::getBodyFlux));
349+
.then(Mono.fromSupplier(() -> (fragmentFormatter != null ?
350+
fragmentFormatter.format(response.getBodyFlux(), fragment, exchange) :
351+
response.getBodyFlux())));
352+
}
353+
354+
@Nullable
355+
private FragmentFormatter getFragmentFormatter(ServerWebExchange exchange) {
356+
for (FragmentFormatter formatter : this.fragmentFormatters) {
357+
if (formatter.supports(exchange.getRequest())) {
358+
return formatter;
359+
}
360+
}
361+
return null;
342362
}
343363

344364
private String getNameForReturnValue(MethodParameter returnType) {
@@ -436,4 +456,71 @@ public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends Data
436456
}
437457
}
438458

459+
460+
/**
461+
* Strategy to render fragment with stream formatting.
462+
*/
463+
private interface FragmentFormatter {
464+
465+
/**
466+
* Whether the formatter supports the given request.
467+
*/
468+
boolean supports(ServerHttpRequest request);
469+
470+
/**
471+
* Format the given fragment.
472+
* @param fragmentBuffers the fragment serialized to data buffers
473+
* @param fragment the fragment being rendered
474+
* @param exchange the current exchange
475+
* @return the formatted fragment
476+
*/
477+
Flux<DataBuffer> format(Flux<DataBuffer> fragmentBuffers, Fragment fragment, ServerWebExchange exchange);
478+
479+
}
480+
481+
482+
/**
483+
* Formatter for Server-Sent Events formatting.
484+
*/
485+
private static class SseFragmentFormatter implements FragmentFormatter {
486+
487+
@Override
488+
public boolean supports(ServerHttpRequest request) {
489+
String header = request.getHeaders().getFirst(HttpHeaders.ACCEPT);
490+
return (header != null && header.contains(MediaType.TEXT_EVENT_STREAM_VALUE));
491+
}
492+
493+
@Override
494+
public Flux<DataBuffer> format(
495+
Flux<DataBuffer> fragmentBuffers, Fragment fragment, ServerWebExchange exchange) {
496+
497+
Charset charset = getCharset(exchange.getRequest());
498+
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
499+
500+
String eventLine = fragment.viewName() != null ? "event:" + fragment.viewName() + "\n" : "";
501+
502+
return Flux.concat(
503+
Flux.just(encodeText(eventLine + "data:", charset, bufferFactory)),
504+
fragmentBuffers,
505+
Flux.just(encodeText("\n\n", charset, bufferFactory)));
506+
}
507+
508+
private Charset getCharset(ServerHttpRequest request) {
509+
for (MediaType mediaType : request.getHeaders().getAccept()) {
510+
if (mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) {
511+
if (mediaType.getCharset() != null) {
512+
return mediaType.getCharset();
513+
}
514+
break;
515+
}
516+
}
517+
return StandardCharsets.UTF_8;
518+
}
519+
520+
private DataBuffer encodeText(String text, Charset charset, DataBufferFactory bufferFactory) {
521+
byte[] bytes = text.getBytes(charset);
522+
return bufferFactory.wrap(bytes);
523+
}
524+
}
525+
439526
}

spring-webflux/src/test/java/org/springframework/web/reactive/result/view/FragmentViewResolutionResultHandlerTests.java

+30
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,35 @@ void render(Object returnValue, MethodParameter parameter) {
9090
assertThat(body).isEqualTo("<p>Hello Foo</p><p>Hello Bar</p>");
9191
}
9292

93+
@Test
94+
void renderSse() {
95+
MockServerHttpRequest request = MockServerHttpRequest.get("/")
96+
.accept(MediaType.TEXT_EVENT_STREAM)
97+
.acceptLanguageAsLocales(Locale.ENGLISH)
98+
.build();
99+
100+
MockServerWebExchange exchange = MockServerWebExchange.from(request);
101+
102+
HandlerResult result = new HandlerResult(
103+
new Handler(),
104+
Flux.just(fragment1, fragment2).subscribeOn(Schedulers.boundedElastic()),
105+
on(Handler.class).resolveReturnType(Flux.class, Fragment.class),
106+
new BindingContext());
107+
108+
String body = initHandler().handleResult(exchange, result)
109+
.then(Mono.defer(() -> exchange.getResponse().getBodyAsString()))
110+
.block(Duration.ofSeconds(60));
111+
112+
assertThat(body).isEqualTo("""
113+
event:fragment1
114+
data:<p>Hello Foo</p>
115+
116+
event:fragment2
117+
data:<p>Hello Bar</p>
118+
119+
""");
120+
}
121+
93122
private ViewResolutionResultHandler initHandler() {
94123

95124
AnnotationConfigApplicationContext context =
@@ -98,6 +127,7 @@ private ViewResolutionResultHandler initHandler() {
98127
String prefix = "org/springframework/web/reactive/result/view/script/kotlin/";
99128
ScriptTemplateViewResolver viewResolver = new ScriptTemplateViewResolver(prefix, ".kts");
100129
viewResolver.setApplicationContext(context);
130+
viewResolver.setSupportedMediaTypes(List.of(MediaType.TEXT_HTML, MediaType.TEXT_EVENT_STREAM));
101131

102132
RequestedContentTypeResolver contentTypeResolver = new HeaderContentTypeResolver();
103133
return new ViewResolutionResultHandler(List.of(viewResolver), contentTypeResolver);

0 commit comments

Comments
 (0)