Skip to content

Commit 4defaba

Browse files
ArtyomGabeevhannahchan
authored andcommitted
StructuredTaskScope instrumentation (open-telemetry#11202)
1 parent 35381eb commit 4defaba

File tree

3 files changed

+137
-1
lines changed

3 files changed

+137
-1
lines changed

instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorsInstrumentationModule.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
3333
new JavaForkJoinTaskInstrumentation(),
3434
new RunnableInstrumentation(),
3535
new ThreadPoolExtendingExecutorInstrumentation(),
36-
new VirtualThreadInstrumentation());
36+
new VirtualThreadInstrumentation(),
37+
new StructuredTaskScopeInstrumentation());
3738
}
3839
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.executors;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
10+
11+
import io.opentelemetry.context.Context;
12+
import io.opentelemetry.instrumentation.api.util.VirtualField;
13+
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
14+
import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper;
15+
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
16+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
17+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
18+
import java.util.concurrent.Callable;
19+
import net.bytebuddy.asm.Advice;
20+
import net.bytebuddy.description.type.TypeDescription;
21+
import net.bytebuddy.matcher.ElementMatcher;
22+
23+
public class StructuredTaskScopeInstrumentation implements TypeInstrumentation {
24+
25+
@Override
26+
public ElementMatcher<TypeDescription> typeMatcher() {
27+
return named("java.util.concurrent.StructuredTaskScope");
28+
}
29+
30+
@Override
31+
public void transform(TypeTransformer transformer) {
32+
transformer.applyAdviceToMethod(
33+
named("fork").and(takesArgument(0, Callable.class)),
34+
this.getClass().getName() + "$ForkCallableAdvice");
35+
}
36+
37+
@SuppressWarnings("unused")
38+
public static class ForkCallableAdvice {
39+
40+
@Advice.OnMethodEnter(suppress = Throwable.class)
41+
public static PropagatedContext enterCallableFork(@Advice.Argument(0) Callable<?> task) {
42+
Context context = Java8BytecodeBridge.currentContext();
43+
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
44+
VirtualField<Callable<?>, PropagatedContext> virtualField =
45+
VirtualField.find(Callable.class, PropagatedContext.class);
46+
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task);
47+
}
48+
return null;
49+
}
50+
51+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
52+
public static void exitCallableFork(
53+
@Advice.Enter PropagatedContext propagatedContext, @Advice.Thrown Throwable throwable) {
54+
ExecutorAdviceHelper.cleanUpAfterSubmit(propagatedContext, throwable);
55+
}
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.executors;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
import io.opentelemetry.api.trace.SpanKind;
11+
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
12+
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
13+
import java.lang.reflect.Method;
14+
import java.util.concurrent.Callable;
15+
import org.junit.jupiter.api.Test;
16+
import org.junit.jupiter.api.condition.EnabledForJreRange;
17+
import org.junit.jupiter.api.condition.JRE;
18+
import org.junit.jupiter.api.extension.RegisterExtension;
19+
20+
@EnabledForJreRange(min = JRE.JAVA_21)
21+
class StructuredTaskScopeTest {
22+
23+
@RegisterExtension
24+
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
25+
26+
@Test
27+
void multipleForkJoin() throws Exception {
28+
Class<?> sofTaskScopeClass =
29+
Class.forName("java.util.concurrent.StructuredTaskScope$ShutdownOnFailure");
30+
Object taskScope = sofTaskScopeClass.getDeclaredConstructor().newInstance();
31+
Class<?> taskScopeClass = Class.forName("java.util.concurrent.StructuredTaskScope");
32+
Method forkMethod = taskScopeClass.getDeclaredMethod("fork", Callable.class);
33+
Method joinMethod = taskScopeClass.getDeclaredMethod("join");
34+
Method closeMethod = taskScopeClass.getDeclaredMethod("close");
35+
36+
Class<?> subtaskClass = Class.forName("java.util.concurrent.StructuredTaskScope$Subtask");
37+
Method getMethod = subtaskClass.getDeclaredMethod("get");
38+
39+
Callable<String> callable1 =
40+
() -> {
41+
testing.runWithSpan("task1", () -> {});
42+
return "a";
43+
};
44+
Callable<String> callable2 =
45+
() -> {
46+
testing.runWithSpan("task2", () -> {});
47+
return "b";
48+
};
49+
50+
String result =
51+
testing.runWithSpan(
52+
"parent",
53+
() -> {
54+
try {
55+
Object fork1 = forkMethod.invoke(taskScope, callable1);
56+
Object fork2 = forkMethod.invoke(taskScope, callable2);
57+
joinMethod.invoke(taskScope);
58+
59+
return "" + getMethod.invoke(fork1) + getMethod.invoke(fork2);
60+
} catch (Exception e) {
61+
throw new AssertionError(e);
62+
}
63+
});
64+
65+
assertThat(result).isEqualTo("ab");
66+
67+
testing.waitAndAssertTraces(
68+
trace ->
69+
trace.hasSpansSatisfyingExactlyInAnyOrder(
70+
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
71+
span ->
72+
span.hasName("task1").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)),
73+
span ->
74+
span.hasName("task2").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0))));
75+
76+
closeMethod.invoke(taskScope);
77+
}
78+
}

0 commit comments

Comments
 (0)