Skip to content

Commit 00b88ec

Browse files
dmitrysulmansdeleuze
authored andcommitted
Propagate CoroutineContext to WebClient filter
This commit introduces a new ResponseSpec.awaitEntityOrNull() extension function to replace ResponseSpec.toEntity(...).awaitFirstOrNull() and pass the CoroutineContext to the CoExchangeFilterFunction. CoroutineContext propagation is implemented via ReactorContext and ClientRequest attribute. Closes gh-34555 Signed-off-by: Dmitry Sulman <[email protected]>
1 parent 3872c1a commit 00b88ec

File tree

4 files changed

+299
-19
lines changed

4 files changed

+299
-19
lines changed

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

+5
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@
7171
*/
7272
final class DefaultWebClient implements WebClient {
7373

74+
// Copy of CoExchangeFilterFunction.COROUTINE_CONTEXT_ATTRIBUTE value to avoid compilation errors in Eclipse
75+
private static final String COROUTINE_CONTEXT_ATTRIBUTE = "org.springframework.web.reactive.function.client.CoExchangeFilterFunction.context";
76+
7477
private static final String URI_TEMPLATE_ATTRIBUTE = WebClient.class.getName() + ".uriTemplate";
7578

7679
private static final Mono<ClientResponse> NO_HTTP_CLIENT_RESPONSE_ERROR = Mono.error(
@@ -430,6 +433,8 @@ private Mono<ClientResponse> exchange() {
430433
if (filterFunctions != null) {
431434
filterFunction = filterFunctions.andThen(filterFunction);
432435
}
436+
contextView.getOrEmpty(COROUTINE_CONTEXT_ATTRIBUTE)
437+
.ifPresent(context -> requestBuilder.attribute(COROUTINE_CONTEXT_ATTRIBUTE, context));
433438
ClientRequest request = requestBuilder.build();
434439
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
435440
observationContext.setRequest(request);

spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/CoExchangeFilterFunction.kt

+22-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,9 +17,13 @@
1717
package org.springframework.web.reactive.function.client
1818

1919
import kotlinx.coroutines.Dispatchers
20+
import kotlinx.coroutines.Job
21+
import kotlinx.coroutines.currentCoroutineContext
2022
import kotlinx.coroutines.reactor.awaitSingle
2123
import kotlinx.coroutines.reactor.mono
2224
import reactor.core.publisher.Mono
25+
import kotlin.coroutines.CoroutineContext
26+
import kotlin.jvm.optionals.getOrNull
2327

2428
/**
2529
* Kotlin-specific implementation of the [ExchangeFilterFunction] interface
@@ -31,10 +35,14 @@ import reactor.core.publisher.Mono
3135
abstract class CoExchangeFilterFunction : ExchangeFilterFunction {
3236

3337
final override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
34-
return mono(Dispatchers.Unconfined) {
38+
val context = request.attribute(COROUTINE_CONTEXT_ATTRIBUTE).getOrNull() as CoroutineContext?
39+
return mono(context ?: Dispatchers.Unconfined) {
3540
filter(request, object : CoExchangeFunction {
3641
override suspend fun exchange(request: ClientRequest): ClientResponse {
37-
return next.exchange(request).awaitSingle()
42+
val newRequest = ClientRequest.from(request)
43+
.attribute(COROUTINE_CONTEXT_ATTRIBUTE, currentCoroutineContext().minusKey(Job.Key))
44+
.build()
45+
return next.exchange(newRequest).awaitSingle()
3846
}
3947
})
4048
}
@@ -58,6 +66,17 @@ abstract class CoExchangeFilterFunction : ExchangeFilterFunction {
5866
* @return the filtered response
5967
*/
6068
protected abstract suspend fun filter(request: ClientRequest, next: CoExchangeFunction): ClientResponse
69+
70+
companion object {
71+
72+
/**
73+
* Name of the [ClientRequest] attribute that contains the
74+
* [kotlin.coroutines.CoroutineContext] to be passed to the
75+
* [CoExchangeFilterFunction.filter].
76+
*/
77+
@JvmField
78+
val COROUTINE_CONTEXT_ATTRIBUTE = CoExchangeFilterFunction::class.java.name + ".context"
79+
}
6180
}
6281

6382

spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt

+47-16
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,26 @@ import kotlinx.coroutines.Job
2020
import kotlinx.coroutines.currentCoroutineContext
2121
import kotlinx.coroutines.flow.Flow
2222
import kotlinx.coroutines.reactive.asFlow
23-
import kotlinx.coroutines.reactor.asFlux
24-
import kotlinx.coroutines.reactor.awaitSingle
25-
import kotlinx.coroutines.reactor.awaitSingleOrNull
26-
import kotlinx.coroutines.reactor.mono
23+
import kotlinx.coroutines.reactor.*
24+
import kotlinx.coroutines.withContext
2725
import org.reactivestreams.Publisher
2826
import org.springframework.core.ParameterizedTypeReference
2927
import org.springframework.http.ResponseEntity
28+
import org.springframework.web.reactive.function.client.CoExchangeFilterFunction.Companion.COROUTINE_CONTEXT_ATTRIBUTE
3029
import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec
3130
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec
3231
import reactor.core.publisher.Flux
3332
import reactor.core.publisher.Mono
33+
import reactor.util.context.Context
34+
import kotlin.coroutines.CoroutineContext
3435

3536
/**
3637
* Extension for [WebClient.RequestBodySpec.body] providing a `body(Publisher<T>)` variant
3738
* leveraging Kotlin reified type parameters. This extension is not subject to type
3839
* erasure and retains actual generic type arguments.
3940
*
4041
* @author Sebastien Deleuze
42+
* @author Dmitry Sulman
4143
* @since 5.0
4244
*/
4345
inline fun <reified T : Any, S : Publisher<T>> RequestBodySpec.body(publisher: S): RequestHeadersSpec<*> =
@@ -89,7 +91,7 @@ inline fun <reified T : Any> RequestBodySpec.bodyValueWithType(body: T): Request
8991
*/
9092
suspend fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(responseHandler: suspend (ClientResponse) -> T): T {
9193
val context = currentCoroutineContext().minusKey(Job.Key)
92-
return exchangeToMono { mono(context) { responseHandler.invoke(it) } }.awaitSingle()
94+
return withContext(context.toReactorContext()) { exchangeToMono { mono(context) { responseHandler.invoke(it) } }.awaitSingle() }
9395
}
9496

9597
/**
@@ -99,7 +101,7 @@ suspend fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange
99101
*/
100102
suspend fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchangeOrNull(responseHandler: suspend (ClientResponse) -> T?): T? {
101103
val context = currentCoroutineContext().minusKey(Job.Key)
102-
return exchangeToMono { mono(context) { responseHandler.invoke(it) } }.awaitSingleOrNull()
104+
return withContext(context.toReactorContext()) { exchangeToMono { mono(context) { responseHandler.invoke(it) } }.awaitSingleOrNull() }
103105
}
104106

105107
/**
@@ -150,29 +152,39 @@ inline fun <reified T : Any> WebClient.ResponseSpec.bodyToFlow(): Flow<T> =
150152
* @author Sebastien Deleuze
151153
* @since 5.2
152154
*/
153-
suspend inline fun <reified T : Any> WebClient.ResponseSpec.awaitBody() : T =
154-
when (T::class) {
155-
Unit::class -> awaitBodilessEntity().let { Unit as T }
156-
else -> bodyToMono<T>().awaitSingle()
155+
suspend inline fun <reified T : Any> WebClient.ResponseSpec.awaitBody() : T {
156+
val context = currentCoroutineContext().minusKey(Job.Key)
157+
return withContext(context.toReactorContext()) {
158+
when (T::class) {
159+
Unit::class -> toBodilessEntity().awaitSingle().let { Unit as T }
160+
else -> bodyToMono<T>().awaitSingle()
161+
}
157162
}
163+
}
158164

159165
/**
160166
* Coroutines variant of [WebClient.ResponseSpec.bodyToMono].
161167
*
162168
* @author Valentin Shakhov
163169
* @since 5.3.6
164170
*/
165-
suspend inline fun <reified T : Any> WebClient.ResponseSpec.awaitBodyOrNull() : T? =
166-
when (T::class) {
167-
Unit::class -> awaitBodilessEntity().let { Unit as T? }
168-
else -> bodyToMono<T>().awaitSingleOrNull()
171+
suspend inline fun <reified T : Any> WebClient.ResponseSpec.awaitBodyOrNull() : T? {
172+
val context = currentCoroutineContext().minusKey(Job.Key)
173+
return withContext(context.toReactorContext()) {
174+
when (T::class) {
175+
Unit::class -> toBodilessEntity().awaitSingle().let { Unit as T? }
176+
else -> bodyToMono<T>().awaitSingleOrNull()
177+
}
169178
}
179+
}
170180

171181
/**
172182
* Coroutines variant of [WebClient.ResponseSpec.toBodilessEntity].
173183
*/
174-
suspend fun WebClient.ResponseSpec.awaitBodilessEntity() =
175-
toBodilessEntity().awaitSingle()
184+
suspend fun WebClient.ResponseSpec.awaitBodilessEntity(): ResponseEntity<Void> {
185+
val context = currentCoroutineContext().minusKey(Job.Key)
186+
return withContext(context.toReactorContext()) { toBodilessEntity().awaitSingle() }
187+
}
176188

177189
/**
178190
* Extension for [WebClient.ResponseSpec.toEntity] providing a `toEntity<Foo>()` variant
@@ -203,3 +215,22 @@ inline fun <reified T : Any> WebClient.ResponseSpec.toEntityList(): Mono<Respons
203215
*/
204216
inline fun <reified T : Any> WebClient.ResponseSpec.toEntityFlux(): Mono<ResponseEntity<Flux<T>>> =
205217
toEntityFlux(object : ParameterizedTypeReference<T>() {})
218+
219+
/**
220+
* Extension for [WebClient.ResponseSpec.toEntity] providing a `toEntity<Foo>()` variant
221+
* leveraging Kotlin reified type parameters and allows [kotlin.coroutines.CoroutineContext]
222+
* propagation to the [CoExchangeFilterFunction]. This extension is not subject to type erasure
223+
* and retains actual generic type arguments.
224+
*
225+
* @since 7.0.0
226+
*/
227+
suspend inline fun <reified T : Any> WebClient.ResponseSpec.awaitEntity(): ResponseEntity<T> {
228+
val context = currentCoroutineContext().minusKey(Job.Key)
229+
return withContext(context.toReactorContext()) { toEntity(T::class.java).awaitSingle() }
230+
}
231+
232+
@PublishedApi
233+
internal fun CoroutineContext.toReactorContext(): ReactorContext {
234+
val context = Context.of(COROUTINE_CONTEXT_ATTRIBUTE, this).readOnly()
235+
return (this[ReactorContext.Key]?.context?.putAll(context) ?: context).asCoroutineContext()
236+
}

0 commit comments

Comments
 (0)