Skip to content

Commit

Permalink
Add a process span to ConfluentKafka instrumentation (#1937)
Browse files Browse the repository at this point in the history
Co-authored-by: Liudmila Molkova <[email protected]>
  • Loading branch information
g7ed6e and lmolkova authored Sep 3, 2024
1 parent bc27d71 commit 544eb98
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 47 deletions.
2 changes: 1 addition & 1 deletion build/Common.props
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<OpenTelemetryCoreLatestVersion>[1.9.0,2.0)</OpenTelemetryCoreLatestVersion>
<OpenTelemetryCoreLatestPrereleaseVersion>[1.9.0-rc.1]</OpenTelemetryCoreLatestPrereleaseVersion>
<StackExchangeRedisPkgVer>[2.6.122,3.0)</StackExchangeRedisPkgVer>
<ConfluentKafkaPkgVer>[2.3.0,3.0)</ConfluentKafkaPkgVer>
<ConfluentKafkaPkgVer>[2.4.0,3.0)</ConfluentKafkaPkgVer>
<CassandraCSharpDriverPkgVer>[3.16.0,4.0)</CassandraCSharpDriverPkgVer>
<StyleCopAnalyzersPkgVer>[1.2.0-beta.507,2.0)</StyleCopAnalyzersPkgVer>
<SystemNetHttp>[4.3.4,)</SystemNetHttp>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
Confluent.Kafka.OpenTelemetryConsumeAndProcessMessageHandler<TKey, TValue>
Confluent.Kafka.OpenTelemetryConsumeResultExtensions
OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder<TKey, TValue>
OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder<TKey, TValue>.InstrumentedConsumerBuilder(System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<string!, string!>>! config) -> void
OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue>
Expand All @@ -6,6 +8,9 @@ OpenTelemetry.Metrics.MeterProviderBuilderExtensions
OpenTelemetry.Trace.TracerProviderBuilderExtensions
override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder<TKey, TValue>.Build() -> Confluent.Kafka.IConsumer<TKey, TValue>!
override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue>.Build() -> Confluent.Kafka.IProducer<TKey, TValue>!
static Confluent.Kafka.OpenTelemetryConsumeResultExtensions.ConsumeAndProcessMessageAsync<TKey, TValue>(this Confluent.Kafka.IConsumer<TKey, TValue>! consumer, Confluent.Kafka.OpenTelemetryConsumeAndProcessMessageHandler<TKey, TValue>! handler) -> System.Threading.Tasks.ValueTask<Confluent.Kafka.ConsumeResult<TKey, TValue>?>
static Confluent.Kafka.OpenTelemetryConsumeResultExtensions.ConsumeAndProcessMessageAsync<TKey, TValue>(this Confluent.Kafka.IConsumer<TKey, TValue>! consumer, Confluent.Kafka.OpenTelemetryConsumeAndProcessMessageHandler<TKey, TValue>! handler, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask<Confluent.Kafka.ConsumeResult<TKey, TValue>?>
static Confluent.Kafka.OpenTelemetryConsumeResultExtensions.TryExtractPropagationContext<TKey, TValue>(this Confluent.Kafka.ConsumeResult<TKey, TValue>! consumeResult, out OpenTelemetry.Context.Propagation.PropagationContext propagationContext) -> bool
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder<TKey, TValue>! consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation<TKey, TValue>(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder<TKey, TValue>? consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder!
Expand All @@ -18,3 +23,4 @@ static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstr
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder!
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue>! producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder!
static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation<TKey, TValue>(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder<TKey, TValue>? producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder!
virtual Confluent.Kafka.OpenTelemetryConsumeAndProcessMessageHandler<TKey, TValue>.Invoke(Confluent.Kafka.ConsumeResult<TKey, TValue>! consumeResult, System.Diagnostics.Activity? activity, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ namespace OpenTelemetry.Instrumentation.ConfluentKafka;

internal static class ConfluentKafkaCommon
{
internal const string ReceiveOperationName = "receive";
internal const string ProcessOperationName = "process";
internal const string KafkaMessagingSystem = "kafka";
internal const string PublishOperationName = "publish";

internal static readonly string InstrumentationName = typeof(ConfluentKafkaCommon).Assembly.GetName().Name!;
internal static readonly string InstrumentationVersion = typeof(ConfluentKafkaCommon).Assembly.GetPackageVersion();
internal static readonly ActivitySource ActivitySource = new(InstrumentationName, InstrumentationVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;
using System.Text;
using Confluent.Kafka;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;
Expand All @@ -11,8 +10,6 @@ namespace OpenTelemetry.Instrumentation.ConfluentKafka;

internal class InstrumentedConsumer<TKey, TValue> : IConsumer<TKey, TValue>
{
private const string ReceiveOperationName = "receive";
private const string KafkaMessagingSystem = "kafka";
private readonly IConsumer<TKey, TValue> consumer;
private readonly ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue> options;

Expand Down Expand Up @@ -265,17 +262,6 @@ public void Close()
private static string FormatConsumeException(ConsumeException consumeException) =>
$"ConsumeException: {consumeException.Error}";

private static PropagationContext ExtractPropagationContext(Headers? headers)
=> Propagators.DefaultTextMapPropagator.Extract(default, headers, ExtractTraceContext);

private static IEnumerable<string> ExtractTraceContext(Headers? headers, string value)
{
if (headers?.TryGetLastBytes(value, out var bytes) == true)
{
yield return Encoding.UTF8.GetString(bytes);
}
}

private static ConsumeResult ExtractConsumeResult(ConsumeResult<TKey, TValue> result) => result switch
{
null => new ConsumeResult(null, null),
Expand All @@ -296,10 +282,10 @@ private static void GetTags(string topic, out TagList tags, int? partition = nul
{
new KeyValuePair<string, object?>(
SemanticConventions.AttributeMessagingOperation,
ReceiveOperationName),
ConfluentKafkaCommon.ReceiveOperationName),
new KeyValuePair<string, object?>(
SemanticConventions.AttributeMessagingSystem,
KafkaMessagingSystem),
ConfluentKafkaCommon.KafkaMessagingSystem),
new KeyValuePair<string, object?>(
SemanticConventions.AttributeMessagingDestinationName,
topic),
Expand Down Expand Up @@ -335,7 +321,7 @@ private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endT
if (this.options.Traces)
{
PropagationContext propagationContext = consumeResult.Headers != null
? ExtractPropagationContext(consumeResult.Headers)
? OpenTelemetryConsumeResultExtensions.ExtractPropagationContext(consumeResult.Headers)
: default;

using Activity? activity = this.StartReceiveActivity(propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key);
Expand Down Expand Up @@ -364,8 +350,8 @@ private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endT
private Activity? StartReceiveActivity(PropagationContext propagationContext, DateTimeOffset start, TopicPartitionOffset? topicPartitionOffset, object? key)
{
var spanName = string.IsNullOrEmpty(topicPartitionOffset?.Topic)
? ReceiveOperationName
: string.Concat(topicPartitionOffset!.Topic, " ", ReceiveOperationName);
? ConfluentKafkaCommon.ReceiveOperationName
: string.Concat(topicPartitionOffset!.Topic, " ", ConfluentKafkaCommon.ReceiveOperationName);

ActivityLink[] activityLinks = propagationContext.ActivityContext.IsValid()
? new[] { new ActivityLink(propagationContext.ActivityContext) }
Expand All @@ -374,13 +360,13 @@ private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endT
Activity? activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: start, parentContext: default);
if (activity?.IsAllDataRequested == true)
{
activity.SetTag(SemanticConventions.AttributeMessagingSystem, KafkaMessagingSystem);
activity.SetTag(SemanticConventions.AttributeMessagingSystem, ConfluentKafkaCommon.KafkaMessagingSystem);
activity.SetTag(SemanticConventions.AttributeMessagingClientId, this.Name);
activity.SetTag(SemanticConventions.AttributeMessagingDestinationName, topicPartitionOffset?.Topic);
activity.SetTag(SemanticConventions.AttributeMessagingKafkaDestinationPartition, topicPartitionOffset?.Partition.Value);
activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageOffset, topicPartitionOffset?.Offset.Value);
activity.SetTag(SemanticConventions.AttributeMessagingKafkaConsumerGroup, this.GroupId);
activity.SetTag(SemanticConventions.AttributeMessagingOperation, ReceiveOperationName);
activity.SetTag(SemanticConventions.AttributeMessagingOperation, ConfluentKafkaCommon.ReceiveOperationName);
if (key != null)
{
activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageKey, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,8 @@ public override IConsumer<TKey, TValue> Build()
Debug.Assert(this.Options != null, "Options should not be null.");

ConsumerConfig config = (ConsumerConfig)this.Config;
if (this.Options!.Metrics)
{
config.StatisticsIntervalMs ??= 1000;
}

var consumer = new InstrumentedConsumer<TKey, TValue>(base.Build(), this.Options);
var consumer = new InstrumentedConsumer<TKey, TValue>(base.Build(), this.Options!);
consumer.GroupId = config.GroupId;

return consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ namespace OpenTelemetry.Instrumentation.ConfluentKafka;

internal sealed class InstrumentedProducer<TKey, TValue> : IProducer<TKey, TValue>
{
private const string PublishOperationName = "publish";
private const string KafkaMessagingSystem = "kafka";

private readonly TextMapPropagator propagator = Propagators.DefaultTextMapPropagator;
private readonly IProducer<TKey, TValue> producer;
private readonly ConfluentKafkaProducerInstrumentationOptions<TKey, TValue> options;
Expand Down Expand Up @@ -285,10 +282,10 @@ private static void GetTags(string topic, out TagList tags, int? partition = nul
{
new KeyValuePair<string, object?>(
SemanticConventions.AttributeMessagingOperation,
PublishOperationName),
ConfluentKafkaCommon.PublishOperationName),
new KeyValuePair<string, object?>(
SemanticConventions.AttributeMessagingSystem,
KafkaMessagingSystem),
ConfluentKafkaCommon.KafkaMessagingSystem),
new KeyValuePair<string, object?>(
SemanticConventions.AttributeMessagingDestinationName,
topic),
Expand Down Expand Up @@ -329,7 +326,7 @@ private static void RecordPublish(TopicPartition topicPartition, TimeSpan durati

private Activity? StartPublishActivity(DateTimeOffset start, string topic, Message<TKey, TValue> message, int? partition = null)
{
var spanName = string.Concat(topic, " ", PublishOperationName);
var spanName = string.Concat(topic, " ", ConfluentKafkaCommon.PublishOperationName);
var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(name: spanName, kind: ActivityKind.Producer, startTime: start);
if (activity == null)
{
Expand All @@ -338,10 +335,10 @@ private static void RecordPublish(TopicPartition topicPartition, TimeSpan durati

if (activity.IsAllDataRequested)
{
activity.SetTag(SemanticConventions.AttributeMessagingSystem, KafkaMessagingSystem);
activity.SetTag(SemanticConventions.AttributeMessagingSystem, ConfluentKafkaCommon.KafkaMessagingSystem);
activity.SetTag(SemanticConventions.AttributeMessagingClientId, this.Name);
activity.SetTag(SemanticConventions.AttributeMessagingDestinationName, topic);
activity.SetTag(SemanticConventions.AttributeMessagingOperation, PublishOperationName);
activity.SetTag(SemanticConventions.AttributeMessagingOperation, ConfluentKafkaCommon.PublishOperationName);

if (message.Key != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ public override IProducer<TKey, TValue> Build()
{
Debug.Assert(this.Options != null, "Options should not be null.");

ProducerConfig config = (ProducerConfig)this.Config;
if (this.Options!.Metrics)
{
config.StatisticsIntervalMs ??= 1000;
}

return new InstrumentedProducer<TKey, TValue>(base.Build(), this.Options);
return new InstrumentedProducer<TKey, TValue>(base.Build(), this.Options!);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,4 @@
<PackageReference Include="Microsoft.Extensions.Options" Version="$(MicrosoftExtensionsOptionsPkgVer)" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)'=='net462'">
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
<PackageReference Include="System.Text.Json" Version="8.0.4" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;

namespace Confluent.Kafka;

/// <summary>
/// An asynchronous action to process the <see cref="ConsumeResult{TKey,TValue}"/>.
/// </summary>
/// <param name="consumeResult">The <see cref="ConsumeResult{TKey,TValue}"/>.</param>
/// <param name="activity">The <see cref="Activity"/>.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/>.</param>
/// <typeparam name="TKey">The type of key of the <see cref="ConsumeResult{TKey,TValue}"/>.</typeparam>
/// <typeparam name="TValue">The type of value of the <see cref="ConsumeResult{TKey,TValue}"/>.</typeparam>
/// <returns>A <see cref="ValueTask"/>.</returns>
public delegate ValueTask OpenTelemetryConsumeAndProcessMessageHandler<TKey, TValue>(
ConsumeResult<TKey, TValue> consumeResult,
Activity? activity,
CancellationToken cancellationToken = default);
Loading

0 comments on commit 544eb98

Please sign in to comment.