From e9ab54fcab19f13ca9840af0666b1d0c4a021ff6 Mon Sep 17 00:00:00 2001 From: Mark Sagi-Kazar Date: Tue, 3 Sep 2024 12:23:34 +0200 Subject: [PATCH 1/2] feat: add separate ingest metrics collector Signed-off-by: Mark Sagi-Kazar --- openmeter/ingest/ingestadapter/metrics.go | 69 +++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 openmeter/ingest/ingestadapter/metrics.go diff --git a/openmeter/ingest/ingestadapter/metrics.go b/openmeter/ingest/ingestadapter/metrics.go new file mode 100644 index 000000000..650a73317 --- /dev/null +++ b/openmeter/ingest/ingestadapter/metrics.go @@ -0,0 +1,69 @@ +package ingestadapter + +import ( + "context" + "fmt" + + "github.com/cloudevents/sdk-go/v2/event" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/openmeterio/openmeter/openmeter/ingest" +) + +// collectorMetrics emits metrics for ingested events. +type collectorMetrics struct { + collector ingest.Collector + + ingestEventsCounter metric.Int64Counter + ingestErrorsCounter metric.Int64Counter +} + +// WithMetrics wraps an [ingest.Collector] and emits metrics for ingested events. +func WithMetrics(collector ingest.Collector, metricMeter metric.Meter) (ingest.Collector, error) { + ingestEventsCounter, err := metricMeter.Int64Counter( + "ingest.events", + metric.WithDescription("Number of events ingested"), + metric.WithUnit("{event}"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create events counter: %w", err) + } + + ingestErrorsCounter, err := metricMeter.Int64Counter( + "ingest.errors", + metric.WithDescription("Number of failed event ingests"), + metric.WithUnit("{error}"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create errors counter: %w", err) + } + + return collectorMetrics{ + collector: collector, + + ingestEventsCounter: ingestEventsCounter, + ingestErrorsCounter: ingestErrorsCounter, + }, nil +} + +// Ingest implements the [ingest.Collector] interface. +func (c collectorMetrics) Ingest(ctx context.Context, namespace string, ev event.Event) error { + namespaceAttr := attribute.String("namespace", namespace) + + err := c.collector.Ingest(ctx, namespace, ev) + if err != nil { + c.ingestErrorsCounter.Add(ctx, 1, metric.WithAttributes(namespaceAttr)) + + return err + } + + c.ingestEventsCounter.Add(ctx, 1, metric.WithAttributes(namespaceAttr)) + + return nil +} + +// Close implements the [ingest.Collector] interface. +func (c collectorMetrics) Close() { + c.collector.Close() +} From 6bcf3b2a405f16c0da4fcae1971be5f4223bfe29 Mon Sep 17 00:00:00 2001 From: Mark Sagi-Kazar Date: Tue, 3 Sep 2024 12:33:31 +0200 Subject: [PATCH 2/2] refactor(ingest): use collector metrics middleware Signed-off-by: Mark Sagi-Kazar --- cmd/server/main.go | 11 +++++++++-- openmeter/ingest/kafkaingest/collector.go | 23 ----------------------- 2 files changed, 9 insertions(+), 25 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 0e9a5b1e1..5cf92a7aa 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -36,6 +36,7 @@ import ( "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/openmeter/debug" "github.com/openmeterio/openmeter/openmeter/ingest" + "github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter" "github.com/openmeterio/openmeter/openmeter/ingest/ingestdriver" "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer" @@ -573,17 +574,23 @@ func initKafkaProducer(ctx context.Context, config config.Configuration, logger return producer, nil } -func initKafkaIngest(producer *kafka.Producer, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, serializer serializer.Serializer) (*kafkaingest.Collector, *kafkaingest.NamespaceHandler, error) { +func initKafkaIngest(producer *kafka.Producer, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, serializer serializer.Serializer) (ingest.Collector, *kafkaingest.NamespaceHandler, error) { + var collector ingest.Collector + collector, err := kafkaingest.NewCollector( producer, serializer, config.Ingest.Kafka.EventsTopicTemplate, - metricMeter, ) if err != nil { return nil, nil, fmt.Errorf("init kafka ingest: %w", err) } + collector, err = ingestadapter.WithMetrics(collector, metricMeter) + if err != nil { + return nil, nil, fmt.Errorf("init kafka ingest: %w", err) + } + kafkaAdminClient, err := kafka.NewAdminClientFromProducer(producer) if err != nil { return nil, nil, err diff --git a/openmeter/ingest/kafkaingest/collector.go b/openmeter/ingest/kafkaingest/collector.go index c6fb37968..e8daa9884 100644 --- a/openmeter/ingest/kafkaingest/collector.go +++ b/openmeter/ingest/kafkaingest/collector.go @@ -9,8 +9,6 @@ import ( "github.com/cloudevents/sdk-go/v2/event" "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer" kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" @@ -25,15 +23,12 @@ type Collector struct { // NamespacedTopicTemplate needs to contain at least one string parameter passed to fmt.Sprintf. // For example: "om_%s_events" NamespacedTopicTemplate string - - ingestEventCounter metric.Int64Counter } func NewCollector( producer *kafka.Producer, serializer serializer.Serializer, namespacedTopicTemplate string, - metricMeter metric.Meter, ) (*Collector, error) { if producer == nil { return nil, fmt.Errorf("producer is required") @@ -44,25 +39,11 @@ func NewCollector( if namespacedTopicTemplate == "" { return nil, fmt.Errorf("namespaced topic template is required") } - if metricMeter == nil { - return nil, fmt.Errorf("metric meter is required") - } - - // Initialize OTel metrics - ingestEventCounter, err := metricMeter.Int64Counter( - "ingest.events", - metric.WithDescription("The number of events ingested"), - metric.WithUnit("{event}"), - ) - if err != nil { - return nil, fmt.Errorf("failed to create events counter: %w", err) - } return &Collector{ Producer: producer, Serializer: serializer, NamespacedTopicTemplate: namespacedTopicTemplate, - ingestEventCounter: ingestEventCounter, }, nil } @@ -96,10 +77,6 @@ func (s Collector) Ingest(ctx context.Context, namespace string, ev event.Event) return fmt.Errorf("producing kafka message: %w", err) } - // Increment the ingest event counter metric - namespaceAttr := attribute.String("namespace", namespace) - s.ingestEventCounter.Add(ctx, 1, metric.WithAttributes(namespaceAttr)) - return nil }