Skip to content

Commit 4a77dbc

Browse files
authored
GH-9259: Fix Reactor context propagation on reactive reply (#9284)
Fixes: #9259 The `Mono.toFuture()` does not propagate context to thread locals of the `CompletableFuture` consumer. See `MonoToCompletableFuture` * Fix `AbstractMessageProducingHandler` to convert reply `Mono` to `CompletableFuture` manually. Use `doOnEach()` and set thread locals from the Reactor context manually around `replyFuture.complete()/completeExceptionally()` * Add respective unit test into `WebFluxObservationPropagationTests` to ensure that same trace is used in downstream endpoints after WebFlux client reply **Auto-cherry-pick to `6.3.x` & `6.2.x`**
1 parent 7a371d6 commit 4a77dbc

File tree

2 files changed

+77
-2
lines changed

2 files changed

+77
-2
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2023 the original author or authors.
2+
* Copyright 2014-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
3030
import java.util.function.BiConsumer;
3131

3232
import org.reactivestreams.Publisher;
33+
import reactor.core.Exceptions;
3334
import reactor.core.publisher.Flux;
3435
import reactor.core.publisher.Mono;
3536
import reactor.core.scheduler.Schedulers;
@@ -47,6 +48,7 @@
4748
import org.springframework.integration.routingslip.RoutingSlipRouteStrategy;
4849
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
4950
import org.springframework.integration.support.utils.IntegrationUtils;
51+
import org.springframework.integration.util.IntegrationReactiveUtils;
5052
import org.springframework.lang.Nullable;
5153
import org.springframework.messaging.Message;
5254
import org.springframework.messaging.MessageChannel;
@@ -366,6 +368,7 @@ private static Publisher<?> toPublisherReply(Object reply, @Nullable ReactiveAda
366368
}
367369
}
368370

371+
@SuppressWarnings("try")
369372
private static CompletableFuture<?> toFutureReply(Object reply, @Nullable ReactiveAdapter reactiveAdapter) {
370373
if (reactiveAdapter != null) {
371374
Mono<?> reactiveReply;
@@ -377,7 +380,31 @@ private static CompletableFuture<?> toFutureReply(Object reply, @Nullable Reacti
377380
reactiveReply = Mono.from(publisher);
378381
}
379382

380-
return reactiveReply.publishOn(Schedulers.boundedElastic()).toFuture();
383+
CompletableFuture<Object> replyFuture = new CompletableFuture<>();
384+
385+
reactiveReply
386+
.publishOn(Schedulers.boundedElastic())
387+
// TODO until Reactor supports context propagation from the MonoToCompletableFuture
388+
.doOnEach((signal) -> {
389+
try (AutoCloseable scope = IntegrationReactiveUtils
390+
.setThreadLocalsFromReactorContext(signal.getContextView())) {
391+
392+
if (signal.isOnError()) {
393+
replyFuture.completeExceptionally(signal.getThrowable());
394+
}
395+
else {
396+
replyFuture.complete(signal.get());
397+
}
398+
399+
}
400+
catch (Exception ex) {
401+
throw Exceptions.bubble(ex);
402+
}
403+
})
404+
.contextCapture()
405+
.subscribe();
406+
407+
return replyFuture;
381408
}
382409
else {
383410
return toCompletableFuture(reply);

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java

+48
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,33 @@
3636
import io.micrometer.tracing.test.simple.SpansAssert;
3737
import org.junit.jupiter.api.BeforeEach;
3838
import org.junit.jupiter.api.Test;
39+
import reactor.core.publisher.Mono;
3940

4041
import org.springframework.beans.factory.annotation.Autowired;
4142
import org.springframework.beans.factory.annotation.Qualifier;
4243
import org.springframework.context.ApplicationContext;
4344
import org.springframework.context.annotation.Bean;
4445
import org.springframework.context.annotation.Configuration;
46+
import org.springframework.core.io.buffer.DataBuffer;
4547
import org.springframework.http.HttpMethod;
48+
import org.springframework.http.HttpStatus;
49+
import org.springframework.http.client.reactive.ClientHttpConnector;
4650
import org.springframework.http.server.reactive.HttpHandler;
4751
import org.springframework.integration.channel.FluxMessageChannel;
4852
import org.springframework.integration.config.EnableIntegration;
4953
import org.springframework.integration.config.EnableIntegrationManagement;
54+
import org.springframework.integration.core.MessagingTemplate;
5055
import org.springframework.integration.dsl.IntegrationFlow;
5156
import org.springframework.integration.webflux.dsl.WebFlux;
5257
import org.springframework.messaging.Message;
58+
import org.springframework.messaging.MessageChannel;
5359
import org.springframework.messaging.PollableChannel;
5460
import org.springframework.test.annotation.DirtiesContext;
5561
import org.springframework.test.context.junit.jupiter.web.SpringJUnitWebConfig;
5662
import org.springframework.test.web.reactive.server.HttpHandlerConnector;
5763
import org.springframework.test.web.reactive.server.WebTestClient;
5864
import org.springframework.web.reactive.config.EnableWebFlux;
65+
import org.springframework.web.reactive.function.client.WebClient;
5966
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
6067

6168
import static org.assertj.core.api.Assertions.assertThat;
@@ -81,6 +88,10 @@ public class WebFluxObservationPropagationTests {
8188
@Autowired
8289
private PollableChannel testChannel;
8390

91+
@Autowired
92+
@Qualifier("webFluxRequestReplyClientFlow.input")
93+
private MessageChannel webFluxRequestReplyClientFlowInput;
94+
8495
@BeforeEach
8596
void setup() {
8697
SPANS.clear();
@@ -122,6 +133,20 @@ void observationIsPropagatedWebFluxRequestReply() {
122133
.haveSameTraceId();
123134
}
124135

136+
@Test
137+
void observationIsPropagatedWebFluxClientRequestReply() {
138+
String result =
139+
new MessagingTemplate()
140+
.convertSendAndReceive(this.webFluxRequestReplyClientFlowInput, "test", String.class);
141+
142+
assertThat(result).isEqualTo("SOME REPLY");
143+
144+
// There is a race condition when we already have a reply, but the span in the last channel is not closed yet.
145+
await().untilAsserted(() -> assertThat(SPANS.spans()).hasSize(5));
146+
SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList()))
147+
.haveSameTraceId();
148+
}
149+
125150
@Configuration
126151
@EnableWebFlux
127152
@EnableIntegration
@@ -188,6 +213,29 @@ FluxMessageChannel webFluxRequestChannel() {
188213
return new FluxMessageChannel();
189214
}
190215

216+
@Bean
217+
IntegrationFlow webFluxRequestReplyClientFlow(ObservationRegistry registry) {
218+
ClientHttpConnector httpConnector =
219+
new HttpHandlerConnector((request, response) -> {
220+
response.setStatusCode(HttpStatus.OK);
221+
222+
Mono<DataBuffer> replyData = Mono.just(response.bufferFactory().wrap("some reply".getBytes()));
223+
224+
return response.writeWith(replyData)
225+
.then(Mono.defer(response::setComplete));
226+
});
227+
WebClient webClient =
228+
WebClient.builder()
229+
.clientConnector(httpConnector)
230+
.observationRegistry(registry)
231+
.build();
232+
233+
return f -> f
234+
.handle(WebFlux.outboundGateway(message -> "/someRequest", webClient)
235+
.expectedResponseType(String.class))
236+
.<String, String>transform(String::toUpperCase);
237+
}
238+
191239
@Bean
192240
IntegrationFlow webFluxRequestReplyFlow(
193241
@Qualifier("webFluxRequestChannel") FluxMessageChannel webFluxRequestChannel) {

0 commit comments

Comments
 (0)