Skip to content

Commit 43a996f

Browse files
committed
use trace ID to map to individual stacks of active contexts
1 parent 039b905 commit 43a996f

File tree

10 files changed

+181
-155
lines changed

10 files changed

+181
-155
lines changed

communication/src/main/java/datadog/communication/serialization/json/JSONWritableFormatter.java

Lines changed: 0 additions & 131 deletions
This file was deleted.

dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsServices.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@
44
import datadog.communication.BackendApiFactory;
55
import datadog.communication.ddagent.SharedCommunicationObjects;
66
import datadog.trace.api.Config;
7+
import datadog.trace.api.DDTraceId;
8+
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
9+
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
10+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
11+
import datadog.trace.llmobs.domain.SpanContextInfo;
12+
import java.util.Deque;
13+
import java.util.Map;
14+
import java.util.NoSuchElementException;
15+
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.ConcurrentLinkedDeque;
17+
import javax.annotation.Nonnull;
718
import org.slf4j.Logger;
819
import org.slf4j.LoggerFactory;
920

@@ -14,9 +25,89 @@ public class LLMObsServices {
1425
final Config config;
1526
final BackendApi backendApi;
1627

28+
Map<DDTraceId, Deque<SpanContextInfo>> activeSpanContextByTID = new ConcurrentHashMap<>();
29+
1730
LLMObsServices(Config config, SharedCommunicationObjects sco) {
1831
this.config = config;
1932
this.backendApi =
2033
new BackendApiFactory(config, sco).createBackendApi(BackendApiFactory.Intake.LLMOBS_API);
2134
}
35+
36+
@Nonnull
37+
public SpanContextInfo getActiveSpanContext() {
38+
// Valid case: possibly start root llm obs span/trace while there is NOT an active apm trace
39+
AgentScope activeScope = AgentTracer.activeScope();
40+
if (activeScope == null) {
41+
logger.warn("NO ACTIVE SPAN CONTEXT");
42+
return new SpanContextInfo();
43+
}
44+
45+
// Unexpected case: null active scope span, log to avoid crashes
46+
if (activeScope.span() == null) {
47+
logger.warn("active span scope found but no null span");
48+
return new SpanContextInfo();
49+
}
50+
51+
// Unexpected case: null trace ID, log to avoid crashes
52+
DDTraceId traceId = activeScope.span().getTraceId();
53+
if (traceId == null) {
54+
logger.warn("active scope found but unexpectedly null trace ID");
55+
return new SpanContextInfo();
56+
}
57+
58+
Deque<SpanContextInfo> activeSpanCtxForTID = activeSpanContextByTID.get(traceId);
59+
// Valid case: possibly start root llm obs span/trace while there's an active apm trace
60+
if (activeSpanCtxForTID == null || activeSpanCtxForTID.isEmpty()) {
61+
logger.warn("NO ACTIVE SPAN CONTEXT FOR TRACE ID {}", traceId);
62+
return new SpanContextInfo();
63+
}
64+
65+
// Valid case: possibly start child llm obs span for a given trace ID
66+
return activeSpanCtxForTID.peek();
67+
}
68+
69+
public void setActiveSpanContext(SpanContextInfo spanContext) {
70+
AgentSpanContext activeCtx = spanContext.getActiveContext();
71+
if (activeCtx == null) {
72+
logger.warn("unexpected null active context");
73+
return;
74+
}
75+
76+
DDTraceId traceId = activeCtx.getTraceId();
77+
if (traceId == null) {
78+
logger.warn("unexpected null trace ID");
79+
return;
80+
}
81+
82+
Deque<SpanContextInfo> contexts = activeSpanContextByTID.get(activeCtx.getTraceId());
83+
if (contexts == null) {
84+
contexts = new ConcurrentLinkedDeque<>();
85+
}
86+
contexts.push(spanContext);
87+
this.activeSpanContextByTID.put(traceId, contexts);
88+
logger.warn("ADDED TRACE ID: {} SPAN CONTEXTS: {}", traceId, contexts);
89+
}
90+
91+
public void removeActiveSpanContext(DDTraceId traceId) {
92+
if (!activeSpanContextByTID.containsKey(traceId)) {
93+
logger.debug("active span contexts not found for trace {}", traceId);
94+
return;
95+
}
96+
Deque<SpanContextInfo> contexts = activeSpanContextByTID.get(traceId);
97+
if (contexts == null) {
98+
return;
99+
}
100+
if (!contexts.isEmpty()) {
101+
try {
102+
contexts.pop();
103+
if (contexts.isEmpty()) {
104+
// the trace MAY still be active, however, the next set should re-create the hierarchy as
105+
// needed
106+
activeSpanContextByTID.remove(traceId);
107+
}
108+
} catch (NoSuchElementException noSuchElementException) {
109+
logger.debug("failed to pop context stack for trace {}", traceId);
110+
}
111+
}
112+
}
22113
}

dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,12 @@ public LLMObsSpan startLLMSpan(
5555

5656
DDLLMObsSpan span =
5757
new DDLLMObsSpan(
58-
Tags.LLMOBS_LLM_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName);
58+
Tags.LLMOBS_LLM_SPAN_KIND,
59+
spanName,
60+
getMLApp(mlApp),
61+
sessionID,
62+
serviceName,
63+
llmObsServices);
5964

6065
span.setTag(LLMObsTags.MODEL_NAME, modelName);
6166
span.setTag(LLMObsTags.MODEL_PROVIDER, modelProvider);
@@ -66,28 +71,48 @@ public LLMObsSpan startLLMSpan(
6671
public LLMObsSpan startAgentSpan(
6772
String spanName, @Nullable String mlApp, @Nullable String sessionID) {
6873
return new DDLLMObsSpan(
69-
Tags.LLMOBS_AGENT_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName);
74+
Tags.LLMOBS_AGENT_SPAN_KIND,
75+
spanName,
76+
getMLApp(mlApp),
77+
sessionID,
78+
serviceName,
79+
llmObsServices);
7080
}
7181

7282
@Override
7383
public LLMObsSpan startToolSpan(
7484
String spanName, @Nullable String mlApp, @Nullable String sessionID) {
7585
return new DDLLMObsSpan(
76-
Tags.LLMOBS_TOOL_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName);
86+
Tags.LLMOBS_TOOL_SPAN_KIND,
87+
spanName,
88+
getMLApp(mlApp),
89+
sessionID,
90+
serviceName,
91+
llmObsServices);
7792
}
7893

7994
@Override
8095
public LLMObsSpan startTaskSpan(
8196
String spanName, @Nullable String mlApp, @Nullable String sessionID) {
8297
return new DDLLMObsSpan(
83-
Tags.LLMOBS_TASK_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName);
98+
Tags.LLMOBS_TASK_SPAN_KIND,
99+
spanName,
100+
getMLApp(mlApp),
101+
sessionID,
102+
serviceName,
103+
llmObsServices);
84104
}
85105

86106
@Override
87107
public LLMObsSpan startWorkflowSpan(
88108
String spanName, @Nullable String mlApp, @Nullable String sessionID) {
89109
return new DDLLMObsSpan(
90-
Tags.LLMOBS_WORKFLOW_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName);
110+
Tags.LLMOBS_WORKFLOW_SPAN_KIND,
111+
spanName,
112+
getMLApp(mlApp),
113+
sessionID,
114+
serviceName,
115+
llmObsServices);
91116
}
92117

93118
private String getMLApp(String mlApp) {

dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package datadog.trace.llmobs.domain;
22

33
import datadog.trace.api.DDSpanTypes;
4+
import datadog.trace.api.DDTraceId;
45
import datadog.trace.api.llmobs.LLMObsSpan;
56
import datadog.trace.api.llmobs.LLMObsTags;
67
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
8+
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
79
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
810
import datadog.trace.bootstrap.instrumentation.api.Tags;
11+
import datadog.trace.llmobs.LLMObsServices;
912
import java.util.Arrays;
1013
import java.util.Collections;
1114
import java.util.HashMap;
@@ -44,36 +47,51 @@ private enum State {
4447

4548
private static final String LLM_OBS_INSTRUMENTATION_NAME = "llmobs";
4649

50+
private static final String PARENT_ID_TAG_INTERNAL = "parent_id";
51+
4752
private static final Logger LOGGER = LoggerFactory.getLogger(DDLLMObsSpan.class);
4853

4954
private final AgentSpan span;
5055
private final String spanKind;
5156

5257
private boolean finished = false;
5358

54-
private static final Logger LOGGER = LoggerFactory.getLogger(DDLLMObsSpan.class);
59+
private final LLMObsServices llmObsServices;
5560

5661
public DDLLMObsSpan(
5762
@Nonnull String kind,
5863
String spanName,
5964
@Nonnull String mlApp,
6065
String sessionID,
61-
@Nonnull String serviceName) {
66+
@Nonnull String serviceName,
67+
@Nonnull LLMObsServices llmObsServices) {
6268

6369
if (null == spanName || spanName.isEmpty()) {
6470
spanName = kind;
6571
}
6672

73+
this.llmObsServices = llmObsServices;
74+
6775
AgentTracer.SpanBuilder spanBuilder =
6876
AgentTracer.get()
6977
.buildSpan(LLM_OBS_INSTRUMENTATION_NAME, spanName)
7078
.withServiceName(serviceName)
7179
.withSpanType(DDSpanTypes.LLMOBS);
7280

81+
SpanContextInfo activeSpanCtxInfo = this.llmObsServices.getActiveSpanContext();
82+
AgentSpanContext activeCtx = activeSpanCtxInfo.getActiveContext();
83+
if (!activeSpanCtxInfo.isRoot() && null != activeCtx) {
84+
spanBuilder.asChildOf(activeCtx);
85+
}
7386
this.span = spanBuilder.start();
87+
this.llmObsServices.setActiveSpanContext(
88+
new SpanContextInfo(span.context(), String.valueOf(span.context().getSpanId())));
89+
7490
this.span.setTag(SPAN_KIND, kind);
7591
this.spanKind = kind;
7692
this.span.setTag(LLMOBS_TAG_PREFIX + LLMObsTags.ML_APP, mlApp);
93+
this.span.setTag(
94+
LLMOBS_TAG_PREFIX + PARENT_ID_TAG_INTERNAL, activeSpanCtxInfo.getParentSpanID());
7795
if (sessionID != null && !sessionID.isEmpty()) {
7896
this.span.setTag(LLMOBS_TAG_PREFIX + LLMObsTags.SESSION_ID, sessionID);
7997
}
@@ -111,7 +129,6 @@ public void annotateIO(
111129
if (finished) {
112130
return;
113131
}
114-
LOGGER.warn("ANNOTATE IN {} OUT {}", inputData, outputData);
115132
if (inputData != null && !inputData.isEmpty()) {
116133
State inputState = validateIOMessages(inputData);
117134
if (validateIOMessages(inputData) != State.VALID) {
@@ -297,7 +314,9 @@ public void finish() {
297314
if (finished) {
298315
return;
299316
}
317+
DDTraceId traceId = this.span.getTraceId();
300318
this.span.finish();
301319
this.finished = true;
320+
this.llmObsServices.removeActiveSpanContext(traceId);
302321
}
303322
}

0 commit comments

Comments
 (0)