Skip to content

Commit f9a19d4

Browse files
Mateusz Rzeszutektrask
authored andcommitted
Migrate Spring Webflux HTTP client instrumentation to Instrumenter API (open-telemetry#4517)
* Migrate Spring Webflux HTTP client instrumentation to Instrumenter API * Update instrumentation/spring/spring-webflux-5.0/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/SpringWebfluxHttpAttributesExtractor.java Co-authored-by: Trask Stalnaker <[email protected]>
1 parent 6638ecb commit f9a19d4

File tree

13 files changed

+407
-191
lines changed

13 files changed

+407
-191
lines changed

instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/httpclients/webclient/WebClientBeanPostProcessor.java

+3-14
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,9 @@
66
package io.opentelemetry.instrumentation.spring.autoconfigure.httpclients.webclient;
77

88
import io.opentelemetry.api.OpenTelemetry;
9-
import io.opentelemetry.instrumentation.spring.webflux.client.WebClientTracingFilter;
10-
import java.util.List;
11-
import java.util.function.Consumer;
9+
import io.opentelemetry.instrumentation.spring.webflux.client.SpringWebfluxTracing;
1210
import org.springframework.beans.factory.ObjectProvider;
1311
import org.springframework.beans.factory.config.BeanPostProcessor;
14-
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
1512
import org.springframework.web.reactive.function.client.WebClient;
1613

1714
/**
@@ -44,18 +41,10 @@ private static WebClient.Builder wrapBuilder(
4441

4542
OpenTelemetry openTelemetry = openTelemetryProvider.getIfUnique();
4643
if (openTelemetry != null) {
47-
return webClientBuilder.filters(webClientFilterFunctionConsumer(openTelemetry));
44+
SpringWebfluxTracing instrumentation = SpringWebfluxTracing.create(openTelemetry);
45+
return webClientBuilder.filters(instrumentation::addClientTracingFilter);
4846
} else {
4947
return webClientBuilder;
5048
}
5149
}
52-
53-
private static Consumer<List<ExchangeFilterFunction>> webClientFilterFunctionConsumer(
54-
OpenTelemetry openTelemetry) {
55-
return functions -> {
56-
if (functions.stream().noneMatch(filter -> filter instanceof WebClientTracingFilter)) {
57-
WebClientTracingFilter.addFilter(openTelemetry, functions);
58-
}
59-
};
60-
}
6150
}

instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/httpclients/webclient/WebClientBeanPostProcessorTest.java

+8-13
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212
import static org.mockito.Mockito.when;
1313

1414
import io.opentelemetry.api.OpenTelemetry;
15-
import io.opentelemetry.instrumentation.spring.webflux.client.WebClientTracingFilter;
1615
import org.junit.jupiter.api.BeforeEach;
1716
import org.junit.jupiter.api.DisplayName;
1817
import org.junit.jupiter.api.Test;
1918
import org.junit.jupiter.api.extension.ExtendWith;
2019
import org.mockito.Mock;
2120
import org.mockito.junit.jupiter.MockitoExtension;
2221
import org.springframework.beans.factory.ObjectProvider;
22+
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
2323
import org.springframework.web.reactive.function.client.WebClient;
2424

2525
@ExtendWith(MockitoExtension.class)
@@ -86,10 +86,7 @@ void addsExchangeFilterWebClient() {
8686
.mutate()
8787
.filters(
8888
functions ->
89-
assertThat(
90-
functions.stream()
91-
.filter(wctf -> wctf instanceof WebClientTracingFilter)
92-
.count())
89+
assertThat(functions.stream().filter(wctf -> isOtelExchangeFilter(wctf)).count())
9390
.isEqualTo(1));
9491

9592
verify(openTelemetryProvider).getIfUnique();
@@ -110,10 +107,7 @@ void doesNotAddExchangeFilterWebClientIfOpenTelemetryUnavailable() {
110107
.mutate()
111108
.filters(
112109
functions ->
113-
assertThat(
114-
functions.stream()
115-
.filter(wctf -> wctf instanceof WebClientTracingFilter)
116-
.count())
110+
assertThat(functions.stream().filter(wctf -> isOtelExchangeFilter(wctf)).count())
117111
.isEqualTo(0));
118112

119113
verify(openTelemetryProvider).getIfUnique();
@@ -135,12 +129,13 @@ void addsExchangeFilterWebClientBuilder() {
135129

136130
webClientBuilder.filters(
137131
functions ->
138-
assertThat(
139-
functions.stream()
140-
.filter(wctf -> wctf instanceof WebClientTracingFilter)
141-
.count())
132+
assertThat(functions.stream().filter(wctf -> isOtelExchangeFilter(wctf)).count())
142133
.isEqualTo(1));
143134

144135
verify(openTelemetryProvider, times(3)).getIfUnique();
145136
}
137+
138+
private static boolean isOtelExchangeFilter(ExchangeFilterFunction wctf) {
139+
return wctf.getClass().getName().startsWith("io.opentelemetry.instrumentation");
140+
}
146141
}

instrumentation/spring/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/client/WebClientHelper.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@
66
package io.opentelemetry.javaagent.instrumentation.spring.webflux.client;
77

88
import io.opentelemetry.api.GlobalOpenTelemetry;
9-
import io.opentelemetry.instrumentation.spring.webflux.client.WebClientTracingFilter;
9+
import io.opentelemetry.instrumentation.spring.webflux.client.SpringWebfluxTracing;
1010
import java.util.List;
1111
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
1212

1313
public class WebClientHelper {
1414

15+
private static final SpringWebfluxTracing INSTRUMENTATION =
16+
SpringWebfluxTracing.create(GlobalOpenTelemetry.get());
17+
1518
public static void addFilter(List<ExchangeFilterFunction> exchangeFilterFunctions) {
16-
WebClientTracingFilter.addFilter(GlobalOpenTelemetry.get(), exchangeFilterFunctions);
19+
INSTRUMENTATION.addClientTracingFilter(exchangeFilterFunctions);
1720
}
1821
}

instrumentation/spring/spring-webflux-5.0/javaagent/src/test/groovy/client/SpringWebfluxHttpClientTest.groovy

+9
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
package client
77

88
import io.netty.channel.ChannelOption
9+
import io.opentelemetry.api.common.AttributeKey
910
import io.opentelemetry.instrumentation.test.AgentTestTrait
1011
import io.opentelemetry.instrumentation.test.base.HttpClientTest
1112
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
1213
import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection
14+
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
1315
import org.springframework.http.HttpMethod
1416
import org.springframework.http.client.reactive.ReactorClientHttpConnector
1517
import org.springframework.web.reactive.function.client.WebClient
@@ -78,6 +80,13 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySp
7880
false
7981
}
8082

83+
@Override
84+
Set<AttributeKey<?>> httpAttributes(URI uri) {
85+
def attributes = super.httpAttributes(uri)
86+
attributes.remove(SemanticAttributes.HTTP_FLAVOR)
87+
return attributes
88+
}
89+
8190
@Override
8291
SingleConnection createSingleConnection(String host, int port) {
8392
return new SpringWebFluxSingleConnection(isOldVersion(), host, port)

instrumentation/spring/spring-webflux-5.0/library/README.md

+10-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Manual Instrumentation for Spring Webflux
22

3-
Provides OpenTelemetry instrumentation for Spring's WebClient.
3+
Provides OpenTelemetry instrumentation for Spring's `WebClient`.
44

55
## Quickstart
66

@@ -10,8 +10,7 @@ Replace `SPRING_VERSION` with the version of spring you're using.
1010
`Minimum version: 5.0`
1111

1212
Replace `OPENTELEMETRY_VERSION` with the latest stable [release](https://mvnrepository.com/artifact/io.opentelemetry).
13-
`Minimum version: 0.8.0`
14-
13+
`Minimum version: 1.8.0`
1514

1615
For Maven add to your `pom.xml`:
1716

@@ -60,17 +59,18 @@ implementation("org.springframework:spring-webflux:SPRING_VERSION")
6059

6160
### Features
6261

63-
#### WebClientTracingFilter
62+
#### `SpringWebfluxTracing`
6463

65-
WebClientTracingFilter adds OpenTelemetry client spans to requests sent using WebClient by implementing the [ExchangeFilterFunction](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/reactive/function/client/ExchangeFilterFunction.html)
64+
`SpringWebfluxTracing` emits client span for each request sent using `WebClient` by implementing
65+
the [ExchangeFilterFunction](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/reactive/function/client/ExchangeFilterFunction.html)
6666
interface. An example is shown below:
6767

6868
##### Usage
6969

7070
```java
7171

72-
import io.opentelemetry.instrumentation.spring.webflux.client.WebClientTracingFilter
73-
import io.opentelemetry.api.trace.Tracer;
72+
import io.opentelemetry.api.OpenTelemetry;
73+
import io.opentelemetry.instrumentation.spring.webflux.client.SpringWebfluxTracing;
7474

7575
import org.springframework.beans.factory.annotation.Autowired;
7676
import org.springframework.context.annotation.Bean;
@@ -81,12 +81,12 @@ import org.springframework.web.reactive.function.client.WebClient;
8181
public class WebClientConfig {
8282

8383
@Bean
84-
public WebClient.Builder webClient(Tracer tracer) {
84+
public WebClient.Builder webClient(OpenTelemetry openTelemetry) {
8585

8686
WebClient webClient = WebClient.create();
87-
WebClientTracingFilter webClientTracingFilter = new WebClientTracingFilter(tracer);
87+
SpringWebfluxTracing instrumentation = SpringWebfluxTracing.create(openTelemetry);
8888

89-
return webClient.mutate().filter(webClientTracingFilter);
89+
return webClient.mutate().filter(instrumentation::addClientTracingFilter);
9090
}
9191
}
9292
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.spring.webflux.client;
7+
8+
import static io.opentelemetry.api.common.AttributeKey.stringKey;
9+
10+
import io.opentelemetry.api.common.AttributeKey;
11+
import io.opentelemetry.api.common.AttributesBuilder;
12+
import io.opentelemetry.instrumentation.api.config.Config;
13+
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
14+
import javax.annotation.Nullable;
15+
import org.springframework.web.reactive.function.client.ClientRequest;
16+
import org.springframework.web.reactive.function.client.ClientResponse;
17+
18+
final class SpringWebfluxExperimentalAttributesExtractor
19+
implements AttributesExtractor<ClientRequest, ClientResponse> {
20+
21+
private static final AttributeKey<String> SPRING_WEBFLUX_EVENT =
22+
stringKey("spring-webflux.event");
23+
private static final AttributeKey<String> SPRING_WEBFLUX_MESSAGE =
24+
stringKey("spring-webflux.message");
25+
26+
public static boolean enabled() {
27+
return Config.get()
28+
.getBoolean("otel.instrumentation.spring-webflux.experimental-span-attributes", false);
29+
}
30+
31+
@Override
32+
public void onStart(AttributesBuilder attributes, ClientRequest request) {}
33+
34+
@Override
35+
public void onEnd(
36+
AttributesBuilder attributes,
37+
ClientRequest request,
38+
@Nullable ClientResponse response,
39+
@Nullable Throwable error) {
40+
41+
// no response and no error means that the request has been cancelled
42+
if (response == null && error == null) {
43+
set(attributes, SPRING_WEBFLUX_EVENT, "cancelled");
44+
set(attributes, SPRING_WEBFLUX_MESSAGE, "The subscription was cancelled");
45+
}
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.spring.webflux.client;
7+
8+
import static java.util.Collections.emptyList;
9+
10+
import io.opentelemetry.instrumentation.api.instrumenter.http.CapturedHttpHeaders;
11+
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
12+
import java.lang.invoke.MethodHandle;
13+
import java.lang.invoke.MethodHandles;
14+
import java.lang.invoke.MethodType;
15+
import java.util.List;
16+
import javax.annotation.Nullable;
17+
import org.springframework.web.reactive.function.client.ClientRequest;
18+
import org.springframework.web.reactive.function.client.ClientResponse;
19+
20+
final class SpringWebfluxHttpAttributesExtractor
21+
extends HttpClientAttributesExtractor<ClientRequest, ClientResponse> {
22+
23+
private static final MethodHandle RAW_STATUS_CODE = findRawStatusCode();
24+
25+
// rawStatusCode() method was introduced in webflux 5.1
26+
// prior to this method, the best we can get is HttpStatus enum, which only covers standard status
27+
// codes (see usage below)
28+
private static MethodHandle findRawStatusCode() {
29+
try {
30+
return MethodHandles.publicLookup()
31+
.findVirtual(ClientResponse.class, "rawStatusCode", MethodType.methodType(int.class));
32+
} catch (IllegalAccessException | NoSuchMethodException e) {
33+
return null;
34+
}
35+
}
36+
37+
SpringWebfluxHttpAttributesExtractor(CapturedHttpHeaders capturedHttpHeaders) {
38+
super(capturedHttpHeaders);
39+
}
40+
41+
@Override
42+
protected String url(ClientRequest request) {
43+
return request.url().toString();
44+
}
45+
46+
@Nullable
47+
@Override
48+
protected String flavor(ClientRequest request, @Nullable ClientResponse response) {
49+
return null;
50+
}
51+
52+
@Override
53+
protected String method(ClientRequest request) {
54+
return request.method().name();
55+
}
56+
57+
@Override
58+
protected List<String> requestHeader(ClientRequest request, String name) {
59+
return request.headers().getOrDefault(name, emptyList());
60+
}
61+
62+
@Nullable
63+
@Override
64+
protected Long requestContentLength(ClientRequest request, @Nullable ClientResponse response) {
65+
return null;
66+
}
67+
68+
@Nullable
69+
@Override
70+
protected Long requestContentLengthUncompressed(
71+
ClientRequest request, @Nullable ClientResponse response) {
72+
return null;
73+
}
74+
75+
@Override
76+
protected Integer statusCode(ClientRequest request, ClientResponse response) {
77+
if (RAW_STATUS_CODE != null) {
78+
// rawStatusCode() method was introduced in webflux 5.1
79+
try {
80+
return (int) RAW_STATUS_CODE.invokeExact(response);
81+
} catch (Throwable ignored) {
82+
// Ignore
83+
}
84+
}
85+
// prior to webflux 5.1, the best we can get is HttpStatus enum, which only covers standard
86+
// status codes
87+
return response.statusCode().value();
88+
}
89+
90+
@Nullable
91+
@Override
92+
protected Long responseContentLength(ClientRequest request, ClientResponse response) {
93+
return null;
94+
}
95+
96+
@Nullable
97+
@Override
98+
protected Long responseContentLengthUncompressed(ClientRequest request, ClientResponse response) {
99+
return null;
100+
}
101+
102+
@Override
103+
protected List<String> responseHeader(
104+
ClientRequest request, ClientResponse response, String name) {
105+
return response.headers().header(name);
106+
}
107+
}

0 commit comments

Comments
 (0)