Skip to content

Commit

Permalink
Revert "removed config variable for disabling kafka-clients" (#7909)
Browse files Browse the repository at this point in the history
Kafka 3.8+ disabled by default to mitigate an ongoing bug

This reverts commit 8c7495a.

(cherry picked from commit ee7f366)

Co-authored-by: nayeem-kamal <[email protected]>
  • Loading branch information
ygree and nayeem-kamal authored Nov 7, 2024
1 parent d14e296 commit d302fe3
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -17,6 +18,11 @@ public ConsumerCoordinatorInstrumentation() {
super("kafka");
}

@Override
public boolean isEnabled() {
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStores = new HashMap<>(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.description.type.TypeDescription;
Expand All @@ -30,6 +31,11 @@ public KafkaConsumerInfoInstrumentation() {
super("kafka");
}

@Override
public boolean isEnabled() {
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStores = new HashMap<>(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -24,6 +25,11 @@ public KafkaConsumerInstrumentation() {
super("kafka");
}

@Override
public boolean isEnabled() {
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStores = new HashMap<>(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import java.util.Map;

@AutoService(InstrumenterModule.class)
Expand All @@ -20,6 +21,11 @@ public KafkaProducerInstrumentation() {
super("kafka");
}

@Override
public boolean isEnabled() {
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
}

@Override
public String instrumentedType() {
return "org.apache.kafka.clients.producer.KafkaProducer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.description.type.TypeDescription;
Expand All @@ -30,6 +31,11 @@ public LegacyKafkaConsumerInfoInstrumentation() {
super("kafka");
}

@Override
public boolean isEnabled() {
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStores = new HashMap<>(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import java.util.Map;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -21,6 +22,11 @@ public MetadataInstrumentation() {
super("kafka");
}

@Override
public boolean isEnabled() {
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
}

@Override
public String hierarchyMarkerType() {
return "org.apache.kafka.clients.Metadata";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;

// new - this instrumentation is completely new.
// the purpose of this class is to provide us with information on consumer group and cluster ID
Expand All @@ -14,6 +15,11 @@ public OffsetCommitCallbackInvokerInstrumentation() {
super("kafka");
}

@Override
public boolean isEnabled() {
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
}

@Override
public String instrumentedType() {
return "org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class KafkaClientCustomPropagationConfigTest extends AgentTestRunner {
super.configurePreAgent()

injectSysConfig("dd.kafka.e2e.duration.enabled", "true")
injectSysConfig("dd.trace.experimental.kafka.enabled","true")
}

@Flaky
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
super.configurePreAgent()

injectSysConfig("dd.kafka.e2e.duration.enabled", "true")
injectSysConfig("dd.trace.experimental.kafka.enabled","true")
}

public static final LinkedHashMap<String, String> PRODUCER_PATHWAY_EDGE_TAGS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,5 +158,7 @@ public final class TraceInstrumentationConfig {
/** If set, the instrumentation will set its resource name on the local root too. */
public static final String AXIS_PROMOTE_RESOURCE_NAME = "trace.axis.promote.resource-name";

public static final String EXPERIMENTAL_KAFKA_ENABLED = "trace.experimental.kafka.enabled";

private TraceInstrumentationConfig() {}
}
8 changes: 8 additions & 0 deletions internal-api/src/main/java/datadog/trace/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,9 @@ public static String getHostName() {
private final boolean kafkaClientPropagationEnabled;
private final Set<String> kafkaClientPropagationDisabledTopics;
private final boolean kafkaClientBase64DecodingEnabled;
// enable the Kafka-3.8 instrumentation manually until testing issues are resolved.
private final boolean experimentalKafkaEnabled;

private final boolean jmsPropagationEnabled;
private final Set<String> jmsPropagationDisabledTopics;
private final Set<String> jmsPropagationDisabledQueues;
Expand Down Expand Up @@ -1564,6 +1567,7 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())
tryMakeImmutableSet(configProvider.getList(KAFKA_CLIENT_PROPAGATION_DISABLED_TOPICS));
kafkaClientBase64DecodingEnabled =
configProvider.getBoolean(KAFKA_CLIENT_BASE64_DECODING_ENABLED, false);
experimentalKafkaEnabled = configProvider.getBoolean(EXPERIMENTAL_KAFKA_ENABLED, false);
jmsPropagationEnabled = isPropagationEnabled(true, "jms");
jmsPropagationDisabledTopics =
tryMakeImmutableSet(configProvider.getList(JMS_PROPAGATION_DISABLED_TOPICS));
Expand Down Expand Up @@ -3045,6 +3049,10 @@ public boolean isKafkaClientBase64DecodingEnabled() {
return kafkaClientBase64DecodingEnabled;
}

public boolean isExperimentalKafkaEnabled() {
return experimentalKafkaEnabled;
}

public boolean isRabbitPropagationEnabled() {
return rabbitPropagationEnabled;
}
Expand Down

0 comments on commit d302fe3

Please sign in to comment.