Skip to content

Commit

Permalink
Service name override for DSM checkpoints in Spark context
Browse files Browse the repository at this point in the history
  • Loading branch information
kr-igor committed Dec 10, 2024
1 parent 88c9405 commit 6bfd76b
Show file tree
Hide file tree
Showing 15 changed files with 363 additions and 138 deletions.
12 changes: 12 additions & 0 deletions dd-java-agent/instrumentation/spark/spark_2.12/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,24 @@ dependencies {
testImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion"
testImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "$sparkVersion"

testImplementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "$sparkVersion"
testImplementation group: 'org.apache.kafka', name: "kafka_$scalaVersion", version: '2.4.0'
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.0'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.4.0.RELEASE'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.4.0.RELEASE'

testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-clients-0.11')

test_spark24Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "2.4.8"
test_spark24Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "2.4.8"
test_spark24Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "2.4.8"
test_spark24Implementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "2.4.8"

test_spark32Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "3.2.4"
test_spark32Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "3.2.4"
test_spark32Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "3.2.4"
test_spark24Implementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "3.2.4"

// We do not support netty versions older than this because of a change to the number of parameters to the
// PooledByteBufAllocator constructor. See this PR where the new constructor (the only one we support) was introduced:
// https://github.com/netty/netty/pull/10267
Expand All @@ -56,6 +67,7 @@ dependencies {
latestDepTestImplementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: '+'
latestDepTestImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: '+'
latestDepTestImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: '+'
latestDepTestImplementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "+"
}

tasks.named("test").configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public String[] helperClassNames() {
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
packageName + ".SparkConfAllowList",
packageName + ".SparkConfUtils",
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import datadog.trace.agent.test.AgentTestRunner
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.junit.Rule
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
import org.springframework.kafka.test.utils.KafkaTestUtils

class SparkStreamingKafkaTest extends AgentTestRunner {
static final SOURCE_TOPIC = "source"
static final SINK_TOPIC = "sink"

@Override
boolean isDataStreamsEnabled() {
return true
}

@Rule
EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, SOURCE_TOPIC, SINK_TOPIC)
EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka

@Override
void configurePreAgent() {
super.configurePreAgent()
injectSysConfig("dd.integration.spark.enabled", "true")
injectSysConfig("dd.integration.kafka.enabled", "true")
}

def "test dsm checkpoints are correctly set"() {
setup:
def appName = "test-app"
def sparkSession = SparkSession.builder()
.config("spark.master", "local[2]")
.config("spark.driver.bindAddress", "localhost")
.appName(appName)
.getOrCreate()

def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
def producer = new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer()

when:
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>(SOURCE_TOPIC, i, i.toString()))
}
producer.flush()

def df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString())
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.option("subscribe", SOURCE_TOPIC)
.load()

def query = df
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString())
.option("checkpointLocation", "/tmp/" + System.currentTimeMillis().toString())
.option("topic", SINK_TOPIC)
.trigger(Trigger.Once())
.foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
@Override
void call(Dataset<Row> rowDataset, Long aLong) throws Exception {
rowDataset.show()
rowDataset.write()
}
})
.start()

query.processAllAvailable()

then:
query.stop()
producer.close()

// check that checkpoints were written with a service name override == "SparkAppName"
assert TEST_DATA_STREAMS_WRITER.payloads.size() > 0
assert TEST_DATA_STREAMS_WRITER.services.size() == 1
assert TEST_DATA_STREAMS_WRITER.services.get(0) == appName
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public String[] helperClassNames() {
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
packageName + ".SparkConfAllowList",
packageName + ".SparkConfUtils",
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import static datadog.trace.core.datastreams.TagsProcessor.PARTITION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.spark.SparkConfUtils.getDatabricksClusterName;
import static datadog.trace.instrumentation.spark.SparkConfUtils.getIsRunningOnDatabricks;
import static datadog.trace.instrumentation.spark.SparkConfUtils.getServiceNameOverride;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.api.DDTraceId;
import datadog.trace.api.sampling.PrioritySampling;
Expand Down Expand Up @@ -110,8 +112,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {

private final boolean isRunningOnDatabricks;
private final String databricksClusterName;
private final String databricksServiceName;
private final String sparkServiceName;
private final String serviceNameOverride;

private boolean lastJobFailed = false;
private String lastJobFailedMessage;
Expand All @@ -130,10 +131,9 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
this.appId = appId;
this.sparkVersion = sparkVersion;

isRunningOnDatabricks = sparkConf.contains("spark.databricks.sparkContextId");
databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null);
databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName);
sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks);
isRunningOnDatabricks = getIsRunningOnDatabricks(sparkConf);
databricksClusterName = getDatabricksClusterName(sparkConf);
serviceNameOverride = getServiceNameOverride(sparkConf);

// If JVM exiting with System.exit(code), it bypass the code closing the application span
//
Expand Down Expand Up @@ -924,10 +924,8 @@ private AgentTracer.SpanBuilder buildSparkSpan(String spanName, Properties prope
AgentTracer.SpanBuilder builder =
tracer.buildSpan(spanName).withSpanType("spark").withTag("app_id", appId);

if (databricksServiceName != null) {
builder.withServiceName(databricksServiceName);
} else if (sparkServiceName != null) {
builder.withServiceName(sparkServiceName);
if (serviceNameOverride != null) {
builder.withServiceName(serviceNameOverride);
}

addPropertiesTags(builder, properties);
Expand Down Expand Up @@ -1153,45 +1151,6 @@ private static String getBatchIdFromBatchKey(String batchKey) {
return batchKey.substring(batchKey.lastIndexOf(".") + 1);
}

private static String getDatabricksServiceName(SparkConf conf, String databricksClusterName) {
if (Config.get().isServiceNameSetByUser()) {
return null;
}

String serviceName = null;
String runName = getDatabricksRunName(conf);
if (runName != null) {
serviceName = "databricks.job-cluster." + runName;
} else if (databricksClusterName != null) {
serviceName = "databricks.all-purpose-cluster." + databricksClusterName;
}

return serviceName;
}

private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDatabricks) {
// If config is not set or running on databricks, not changing the service name
if (!Config.get().useSparkAppNameAsService() || isRunningOnDatabricks) {
return null;
}

// Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM
String serviceName = Config.get().getServiceName();
if (Config.get().isServiceNameSetByUser()
&& !"spark".equals(serviceName)
&& !"hadoop".equals(serviceName)) {
log.debug("Service '{}' explicitly set by user, not using the application name", serviceName);
return null;
}

String sparkAppName = conf.get("spark.app.name", null);
if (sparkAppName != null) {
log.info("Using Spark application name '{}' as the Datadog service name", sparkAppName);
}

return sparkAppName;
}

private static void reportKafkaOffsets(
final String appName, final AgentSpan span, final SourceProgress progress) {
if (!span.traceConfig().isDataStreamsEnabled()
Expand Down Expand Up @@ -1234,34 +1193,4 @@ private static void reportKafkaOffsets(
}
}
}

private static String getDatabricksRunName(SparkConf conf) {
String allTags = conf.get("spark.databricks.clusterUsageTags.clusterAllTags", null);
if (allTags == null) {
return null;
}

try {
// Using the jackson JSON lib used by spark
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0
JsonNode jsonNode = objectMapper.readTree(allTags);

for (JsonNode node : jsonNode) {
String key = node.get("key").asText();
if ("RunName".equals(key)) {
// Databricks jobs launched by Azure Data Factory have an uuid at the end of the name
return removeUuidFromEndOfString(node.get("value").asText());
}
}
} catch (Exception ignored) {
}

return null;
}

@SuppressForbidden // called at most once per spark application
private static String removeUuidFromEndOfString(String input) {
return input.replaceAll(
"_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", "");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package datadog.trace.instrumentation.spark;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Config;
import de.thetaphi.forbiddenapis.SuppressForbidden;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkConfUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Logger log = LoggerFactory.getLogger(SparkConfUtils.class);

public static boolean getIsRunningOnDatabricks(SparkConf sparkConf) {
return sparkConf.contains("spark.databricks.sparkContextId");
}

public static String getDatabricksClusterName(SparkConf sparkConf) {
return sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null);
}

public static String getDatabricksServiceName(SparkConf conf, String databricksClusterName) {
if (Config.get().isServiceNameSetByUser()) {
return null;
}

String serviceName = null;
String runName = getDatabricksRunName(conf);
if (runName != null) {
serviceName = "databricks.job-cluster." + runName;
} else if (databricksClusterName != null) {
serviceName = "databricks.all-purpose-cluster." + databricksClusterName;
}

return serviceName;
}

public static String getSparkServiceName(SparkConf conf, boolean isRunningOnDatabricks) {
// If config is not set or running on databricks, not changing the service name
if (!Config.get().useSparkAppNameAsService() || isRunningOnDatabricks) {
return null;
}

// Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM
String serviceName = Config.get().getServiceName();
if (Config.get().isServiceNameSetByUser()
&& !"spark".equals(serviceName)
&& !"hadoop".equals(serviceName)) {
log.debug("Service '{}' explicitly set by user, not using the application name", serviceName);
return null;
}

String sparkAppName = conf.get("spark.app.name", null);
if (sparkAppName != null) {
log.info("Using Spark application name '{}' as the Datadog service name", sparkAppName);
}

return sparkAppName;
}

public static String getServiceNameOverride(SparkConf conf) {
boolean isRunningOnDatabricks = getIsRunningOnDatabricks(conf);
String databricksClusterName = getDatabricksClusterName(conf);
String databricksServiceName = getDatabricksServiceName(conf, databricksClusterName);
String sparkServiceName = getSparkServiceName(conf, isRunningOnDatabricks);

return databricksServiceName != null ? databricksServiceName : sparkServiceName;
}

private static String getDatabricksRunName(SparkConf conf) {
String allTags = conf.get("spark.databricks.clusterUsageTags.clusterAllTags", null);
if (allTags == null) {
return null;
}

try {
// Using the jackson JSON lib used by spark
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0
JsonNode jsonNode = objectMapper.readTree(allTags);

for (JsonNode node : jsonNode) {
String key = node.get("key").asText();
if ("RunName".equals(key)) {
// Databricks jobs launched by Azure Data Factory have an uuid at the end of the name
return removeUuidFromEndOfString(node.get("value").asText());
}
}
} catch (Exception ignored) {
}

return null;
}

@SuppressForbidden // called at most once per spark application
private static String removeUuidFromEndOfString(String input) {
return input.replaceAll(
"_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", "");
}
}
Loading

0 comments on commit 6bfd76b

Please sign in to comment.