Skip to content

Commit df87a93

Browse files
committed
code review fixes, add version tag
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent f1adf64 commit df87a93

File tree

6 files changed

+40
-9
lines changed

6 files changed

+40
-9
lines changed

dd-java-agent/instrumentation/spark/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ configurations.all {
77
resolutionStrategy.deactivateDependencyLocking()
88
}
99
dependencies {
10+
implementation project(':utils:version-utils')
1011
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
1112
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
1213

dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,13 @@ import org.apache.spark.scheduler.SparkListener
55

66
class SparkListenerTest extends AbstractSparkListenerTest {
77
@Override
8-
protected SparkListener getTestDatadogSparkListener() {
8+
protected DatadogSpark212Listener getTestDatadogSparkListener() {
99
def conf = new SparkConf()
1010
return new DatadogSpark212Listener(conf, "some_app_id", "some_version")
1111
}
12+
13+
@Override
14+
protected DatadogSpark212Listener getTestDatadogSparkListener(SparkConf conf) {
15+
return new DatadogSpark212Listener(conf, "some_app_id", "some_version")
16+
}
1217
}

dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,13 @@ import org.apache.spark.scheduler.SparkListener
55

66
class SparkListenerTest extends AbstractSparkListenerTest {
77
@Override
8-
protected SparkListener getTestDatadogSparkListener() {
8+
protected DatadogSpark213Listener getTestDatadogSparkListener() {
99
def conf = new SparkConf()
1010
return new DatadogSpark213Listener(conf, "some_app_id", "some_version")
1111
}
12+
13+
@Override
14+
protected DatadogSpark213Listener getTestDatadogSparkListener(SparkConf conf) {
15+
return new DatadogSpark213Listener(conf, "some_app_id", "some_version")
16+
}
1217
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.fasterxml.jackson.databind.JsonNode;
99
import com.fasterxml.jackson.databind.ObjectMapper;
10+
import datadog.common.version.VersionInfo;
1011
import datadog.trace.api.Config;
1112
import datadog.trace.api.DDTags;
1213
import datadog.trace.api.DDTraceId;
@@ -168,7 +169,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
168169
}
169170

170171
public void setupOpenLineage(DDTraceId traceId) {
171-
log.error("Setting up OpenLineage configuration with trace id {}", traceId);
172+
log.debug("Setting up OpenLineage configuration with trace id {}", traceId);
172173
if (openLineageSparkListener != null) {
173174
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
174175
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
@@ -185,7 +186,9 @@ public void setupOpenLineage(DDTraceId traceId) {
185186
+ ";_dd.ol_intake.emit_spans:false;_dd.ol_service:"
186187
+ sparkServiceName
187188
+ ";_dd.ol_app_id:"
188-
+ appId);
189+
+ appId
190+
+ ";_dd.tracer_version:"
191+
+ VersionInfo.VERSION);
189192
return;
190193
}
191194
log.debug(

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
5050
}
5151

5252
if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) {
53-
log.error("Have parent info, but not root parent info. Can't construct valid trace id.");
53+
log.debug("Found parent info, but not root parent info. Can't construct valid trace id.");
5454
return Optional.empty();
5555
}
5656

@@ -59,7 +59,7 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
5959
String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID);
6060

6161
if (!UUID.matcher(parentRunId).matches()) {
62-
log.error("OpenLineage parent run id is not a valid UUID: {}", parentRunId);
62+
log.debug("OpenLineage parent run id is not a valid UUID: {}", parentRunId);
6363
return Optional.empty();
6464
}
6565

@@ -68,7 +68,7 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
6868
String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID);
6969

7070
if (!UUID.matcher(rootParentRunId).matches()) {
71-
log.error("OpenLineage root parent run id is not a valid UUID: {}", parentRunId);
71+
log.debug("OpenLineage root parent run id is not a valid UUID: {}", parentRunId);
7272
return Optional.empty();
7373
}
7474

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding
44
import com.datadoghq.sketch.ddsketch.proto.DDSketch
55
import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore
66
import datadog.trace.agent.test.AgentTestRunner
7+
import org.apache.spark.SparkConf
78
import org.apache.spark.Success$
89
import org.apache.spark.executor.TaskMetrics
910
import org.apache.spark.scheduler.JobSucceeded$
10-
import org.apache.spark.scheduler.SparkListener
1111
import org.apache.spark.scheduler.SparkListenerApplicationEnd
1212
import org.apache.spark.scheduler.SparkListenerApplicationStart
1313
import org.apache.spark.scheduler.SparkListenerExecutorAdded
@@ -30,7 +30,8 @@ import scala.collection.JavaConverters
3030

3131
abstract class AbstractSparkListenerTest extends AgentTestRunner {
3232

33-
protected abstract SparkListener getTestDatadogSparkListener()
33+
protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener()
34+
protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener(SparkConf conf)
3435

3536
protected applicationStartEvent(time=0L) {
3637
// Constructor of SparkListenerApplicationStart changed starting spark 3.0
@@ -457,6 +458,22 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner {
457458
}
458459
}
459460

461+
def "sets up OpenLineage trace id properly"() {
462+
setup:
463+
def conf = new SparkConf()
464+
conf.set("spark.openlineage.parentRunId", "ad3b6baa-8d88-3b38-8dbe-f06232249a84")
465+
conf.set("spark.openlineage.parentJobNamespace", "default")
466+
conf.set("spark.openlineage.parentJobName", "dag-push-to-s3-spark.upload_to_s3")
467+
conf.set("spark.openlineage.rootParentRunId", "01964820-5280-7674-b04e-82fbed085f39")
468+
conf.set("spark.openlineage.rootParentJobNamespace", "default")
469+
conf.set("spark.openlineage.rootParentJobName", "dag-push-to-s3-spark")
470+
def listener = getTestDatadogSparkListener(conf)
471+
472+
expect:
473+
listener.onApplicationStart(applicationStartEvent(1000L))
474+
assert listener.openLineageSparkConf.get("spark.openlineage.run.tags").contains("13959090542865903119")
475+
}
476+
460477
protected validateRelativeError(double value, double expected, double relativeAccuracy) {
461478
double relativeError = Math.abs(value - expected) / expected
462479
assert relativeError < relativeAccuracy

0 commit comments

Comments
 (0)