Skip to content

Commit 8659686

Browse files
committed
Merge branch 'gary/submit-evals-2' into gary/llmobs-java-sdk-integration
2 parents 8a9d17d + 19d95d7 commit 8659686

File tree

13 files changed

+577
-8
lines changed

13 files changed

+577
-8
lines changed

dd-java-agent/agent-llmobs/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ minimumInstructionCoverage = 0.0
2222

2323
dependencies {
2424
api libs.slf4j
25+
implementation libs.jctools
2526

2627
implementation project(':communication')
2728
implementation project(':components:json')
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package datadog.trace.llmobs;
2+
3+
import static datadog.trace.util.AgentThreadFactory.AgentThread.LLMOBS_EVALS_PROCESSOR;
4+
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
5+
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
6+
7+
import com.squareup.moshi.JsonAdapter;
8+
import com.squareup.moshi.Moshi;
9+
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
10+
import datadog.communication.ddagent.SharedCommunicationObjects;
11+
import datadog.communication.http.HttpRetryPolicy;
12+
import datadog.communication.http.OkHttpUtils;
13+
import datadog.trace.api.Config;
14+
import datadog.trace.llmobs.domain.LLMObsEval;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
import java.util.concurrent.TimeUnit;
18+
import okhttp3.Headers;
19+
import okhttp3.HttpUrl;
20+
import okhttp3.OkHttpClient;
21+
import okhttp3.Request;
22+
import okhttp3.RequestBody;
23+
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
public class EvalProcessingWorker implements AutoCloseable {
28+
29+
private static final String EVAL_METRIC_API_DOMAIN = "api";
30+
private static final String EVAL_METRIC_API_PATH = "api/intake/llm-obs/v1/eval-metric";
31+
32+
private static final String EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain";
33+
private static final String DD_API_KEY_HEADER_NAME = "DD-API-KEY";
34+
35+
private static final Logger log = LoggerFactory.getLogger(EvalProcessingWorker.class);
36+
37+
private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
38+
private final Thread serializerThread;
39+
40+
public EvalProcessingWorker(
41+
final int capacity,
42+
final long flushInterval,
43+
final TimeUnit timeUnit,
44+
final SharedCommunicationObjects sco,
45+
Config config) {
46+
this.queue = new MpscBlockingConsumerArrayQueue<>(capacity);
47+
48+
boolean isAgentless = config.isLlmObsAgentlessEnabled();
49+
if (isAgentless && (config.getApiKey() == null || config.getApiKey().isEmpty())) {
50+
log.error("Agentless eval metric submission requires an API key");
51+
}
52+
53+
Headers headers;
54+
HttpUrl submissionUrl;
55+
if (isAgentless) {
56+
submissionUrl =
57+
HttpUrl.get(
58+
"https://"
59+
+ EVAL_METRIC_API_DOMAIN
60+
+ "."
61+
+ config.getSite()
62+
+ "/"
63+
+ EVAL_METRIC_API_PATH);
64+
headers = Headers.of(DD_API_KEY_HEADER_NAME, config.getApiKey());
65+
} else {
66+
submissionUrl =
67+
HttpUrl.get(
68+
sco.agentUrl.toString()
69+
+ DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT
70+
+ EVAL_METRIC_API_PATH);
71+
headers = Headers.of(EVP_SUBDOMAIN_HEADER_NAME, EVAL_METRIC_API_DOMAIN);
72+
}
73+
74+
EvalSerializingHandler serializingHandler =
75+
new EvalSerializingHandler(queue, flushInterval, timeUnit, submissionUrl, headers);
76+
this.serializerThread = newAgentThread(LLMOBS_EVALS_PROCESSOR, serializingHandler);
77+
}
78+
79+
public void start() {
80+
this.serializerThread.start();
81+
}
82+
83+
public boolean addToQueue(final LLMObsEval eval) {
84+
return queue.offer(eval);
85+
}
86+
87+
@Override
88+
public void close() {
89+
serializerThread.interrupt();
90+
try {
91+
serializerThread.join(THREAD_JOIN_TIMOUT_MS);
92+
} catch (InterruptedException ignored) {
93+
}
94+
}
95+
96+
public static class EvalSerializingHandler implements Runnable {
97+
98+
private static final Logger log = LoggerFactory.getLogger(EvalSerializingHandler.class);
99+
private static final int FLUSH_THRESHOLD = 50;
100+
101+
private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
102+
private final long ticksRequiredToFlush;
103+
private long lastTicks;
104+
105+
private final Moshi moshi;
106+
private final JsonAdapter<LLMObsEval.Request> evalJsonAdapter;
107+
private final OkHttpClient httpClient;
108+
private final HttpUrl submissionUrl;
109+
private final Headers headers;
110+
111+
private final List<LLMObsEval> buffer = new ArrayList<>();
112+
113+
public EvalSerializingHandler(
114+
final MpscBlockingConsumerArrayQueue<LLMObsEval> queue,
115+
final long flushInterval,
116+
final TimeUnit timeUnit,
117+
final HttpUrl submissionUrl,
118+
final Headers headers) {
119+
this.queue = queue;
120+
this.moshi = new Moshi.Builder().add(LLMObsEval.class, new LLMObsEval.Adapter()).build();
121+
122+
this.evalJsonAdapter = moshi.adapter(LLMObsEval.Request.class);
123+
this.httpClient = new OkHttpClient();
124+
this.submissionUrl = submissionUrl;
125+
this.headers = headers;
126+
127+
this.lastTicks = System.nanoTime();
128+
this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval);
129+
130+
log.debug("starting eval metric serializer, url={}", submissionUrl);
131+
}
132+
133+
@Override
134+
public void run() {
135+
try {
136+
runDutyCycle();
137+
} catch (InterruptedException e) {
138+
Thread.currentThread().interrupt();
139+
}
140+
log.debug(
141+
"eval processor worker exited. submitting evals stopped. unsubmitted evals left: "
142+
+ !queuesAreEmpty());
143+
}
144+
145+
private void runDutyCycle() throws InterruptedException {
146+
Thread thread = Thread.currentThread();
147+
while (!thread.isInterrupted()) {
148+
consumeBatch();
149+
flushIfNecessary();
150+
}
151+
}
152+
153+
private void consumeBatch() {
154+
queue.drain(buffer::add, queue.size());
155+
}
156+
157+
protected void flushIfNecessary() {
158+
if (buffer.isEmpty()) {
159+
return;
160+
}
161+
if (shouldFlush()) {
162+
LLMObsEval.Request llmobsEvalReq = new LLMObsEval.Request(this.buffer);
163+
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);
164+
165+
String reqBod = evalJsonAdapter.toJson(llmobsEvalReq);
166+
167+
RequestBody requestBody =
168+
RequestBody.create(okhttp3.MediaType.parse("application/json"), reqBod);
169+
Request request =
170+
new Request.Builder().headers(headers).url(submissionUrl).post(requestBody).build();
171+
172+
try (okhttp3.Response response =
173+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
174+
175+
if (response.isSuccessful()) {
176+
log.debug("successfully flushed evaluation request with {} evals", this.buffer.size());
177+
this.buffer.clear();
178+
} else {
179+
log.error(
180+
"Could not submit eval metrics (HTTP code "
181+
+ response.code()
182+
+ ")"
183+
+ (response.body() != null ? ": " + response.body().string() : ""));
184+
}
185+
} catch (Exception e) {
186+
log.error("Could not submit eval metrics", e);
187+
}
188+
}
189+
}
190+
191+
private boolean shouldFlush() {
192+
long nanoTime = System.nanoTime();
193+
long ticks = nanoTime - lastTicks;
194+
if (ticks > ticksRequiredToFlush || queue.size() >= FLUSH_THRESHOLD) {
195+
lastTicks = nanoTime;
196+
return true;
197+
}
198+
return false;
199+
}
200+
201+
protected boolean queuesAreEmpty() {
202+
return queue.isEmpty();
203+
}
204+
}
205+
}

dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77
import datadog.trace.api.llmobs.LLMObsTags;
88
import datadog.trace.bootstrap.instrumentation.api.Tags;
99
import datadog.trace.llmobs.domain.DDLLMObsSpan;
10+
import datadog.trace.llmobs.domain.LLMObsEval;
1011
import datadog.trace.llmobs.domain.LLMObsInternal;
1112
import java.lang.instrument.Instrumentation;
13+
import java.util.Map;
14+
import java.util.concurrent.TimeUnit;
1215
import org.jetbrains.annotations.Nullable;
1316
import org.slf4j.Logger;
1417
import org.slf4j.LoggerFactory;
@@ -29,7 +32,100 @@ public static void start(Instrumentation inst, SharedCommunicationObjects sco) {
2932
sco.createRemaining(config);
3033

3134
LLMObsInternal.setLLMObsSpanFactory(
32-
new LLMObsManualSpanFactory(config.getLlmObsMlApp(), config.getServiceName()));
35+
new LLMObsManualSpanFactory(
36+
config.getLlmObsMlApp(), config.getServiceName()));
37+
38+
String mlApp = config.getLlmObsMlApp();
39+
LLMObsInternal.setLLMObsSpanFactory(
40+
new LLMObsManualSpanFactory(mlApp, config.getServiceName()));
41+
42+
LLMObsInternal.setLLMObsEvalProcessor(new LLMObsCustomEvalProcessor(mlApp, sco, config));
43+
}
44+
45+
private static class LLMObsCustomEvalProcessor implements LLMObs.LLMObsEvalProcessor {
46+
private final String defaultMLApp;
47+
private final EvalProcessingWorker evalProcessingWorker;
48+
49+
public LLMObsCustomEvalProcessor(
50+
String defaultMLApp, SharedCommunicationObjects sco, Config config) {
51+
52+
this.defaultMLApp = defaultMLApp;
53+
this.evalProcessingWorker =
54+
new EvalProcessingWorker(1024, 100, TimeUnit.MILLISECONDS, sco, config);
55+
this.evalProcessingWorker.start();
56+
}
57+
58+
@Override
59+
public void SubmitEvaluation(
60+
LLMObsSpan llmObsSpan, String label, double scoreValue, Map<String, Object> tags) {
61+
SubmitEvaluation(llmObsSpan, label, scoreValue, defaultMLApp, tags);
62+
}
63+
64+
@Override
65+
public void SubmitEvaluation(
66+
LLMObsSpan llmObsSpan,
67+
String label,
68+
double scoreValue,
69+
String mlApp,
70+
Map<String, Object> tags) {
71+
if (llmObsSpan == null) {
72+
LOGGER.error("null llm obs span provided, eval not recorded");
73+
return;
74+
}
75+
76+
if (mlApp == null || mlApp.isEmpty()) {
77+
mlApp = defaultMLApp;
78+
}
79+
String traceID = llmObsSpan.getTraceId().toHexString();
80+
long spanID = llmObsSpan.getSpanId();
81+
LLMObsEval.Score score =
82+
new LLMObsEval.Score(
83+
traceID, spanID, System.currentTimeMillis(), mlApp, label, tags, scoreValue);
84+
if (!this.evalProcessingWorker.addToQueue(score)) {
85+
LOGGER.warn(
86+
"queue full, failed to add score eval, ml_app={}, trace_id={}, span_id={}, label={}",
87+
mlApp,
88+
traceID,
89+
spanID,
90+
label);
91+
}
92+
}
93+
94+
@Override
95+
public void SubmitEvaluation(
96+
LLMObsSpan llmObsSpan, String label, String categoricalValue, Map<String, Object> tags) {
97+
SubmitEvaluation(llmObsSpan, label, categoricalValue, defaultMLApp, tags);
98+
}
99+
100+
@Override
101+
public void SubmitEvaluation(
102+
LLMObsSpan llmObsSpan,
103+
String label,
104+
String categoricalValue,
105+
String mlApp,
106+
Map<String, Object> tags) {
107+
if (llmObsSpan == null) {
108+
LOGGER.error("null llm obs span provided, eval not recorded");
109+
return;
110+
}
111+
112+
if (mlApp == null || mlApp.isEmpty()) {
113+
mlApp = defaultMLApp;
114+
}
115+
String traceID = llmObsSpan.getTraceId().toHexString();
116+
long spanID = llmObsSpan.getSpanId();
117+
LLMObsEval.Categorical category =
118+
new LLMObsEval.Categorical(
119+
traceID, spanID, System.currentTimeMillis(), mlApp, label, tags, categoricalValue);
120+
if (!this.evalProcessingWorker.addToQueue(category)) {
121+
LOGGER.warn(
122+
"queue full, failed to add categorical eval, ml_app={}, trace_id={}, span_id={}, label={}",
123+
mlApp,
124+
traceID,
125+
spanID,
126+
label);
127+
}
128+
}
33129
}
34130

35131
private static class LLMObsManualSpanFactory implements LLMObs.LLMObsSpanFactory {

dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import datadog.context.ContextScope;
44
import datadog.trace.api.DDSpanTypes;
5+
import datadog.trace.api.DDTraceId;
56
import datadog.trace.api.llmobs.LLMObs;
67
import datadog.trace.api.llmobs.LLMObsSpan;
78
import datadog.trace.api.llmobs.LLMObsTags;
@@ -67,18 +68,22 @@ public DDLLMObsSpan(
6768
if (sessionId != null && !sessionId.isEmpty()) {
6869
this.span.setTag(LLMOBS_TAG_PREFIX + LLMObsTags.SESSION_ID, sessionId);
6970
}
70-
71+
7172
AgentSpanContext parent = LLMObsState.getLLMObsParentContext();
7273
String parentSpanID = LLMObsState.ROOT_SPAN_ID;
7374
if (null != parent) {
7475
if (parent.getTraceId() != this.span.getTraceId()) {
75-
LOGGER.error("trace ID mismatch, retrieved parent from context trace_id={}, span_id={}, started span trace_id={}, span_id={}", parent.getTraceId(), parent.getSpanId(), this.span.getTraceId(), this.span.getSpanId());
76+
LOGGER.error(
77+
"trace ID mismatch, retrieved parent from context trace_id={}, span_id={}, started span trace_id={}, span_id={}",
78+
parent.getTraceId(),
79+
parent.getSpanId(),
80+
this.span.getTraceId(),
81+
this.span.getSpanId());
7682
} else {
7783
parentSpanID = String.valueOf(parent.getSpanId());
7884
}
7985
}
80-
this.span.setTag(
81-
LLMOBS_TAG_PREFIX + PARENT_ID_TAG_INTERNAL, parentSpanID);
86+
this.span.setTag(LLMOBS_TAG_PREFIX + PARENT_ID_TAG_INTERNAL, parentSpanID);
8287
this.scope = LLMObsState.attach();
8388
LLMObsState.setLLMObsParentContext(this.span.context());
8489
}
@@ -292,4 +297,14 @@ public void finish() {
292297
this.scope.close();
293298
this.finished = true;
294299
}
300+
301+
@Override
302+
public DDTraceId getTraceId() {
303+
return this.span.getTraceId();
304+
}
305+
306+
@Override
307+
public long getSpanId() {
308+
return this.span.getSpanId();
309+
}
295310
}

0 commit comments

Comments
 (0)