Skip to content

Commit

Permalink
Merge pull request #1468 from openmeterio/refactor-collector-meter
Browse files Browse the repository at this point in the history
refactor: collector meter
  • Loading branch information
sagikazarmark authored Sep 3, 2024
2 parents bf010b7 + 6bcf3b2 commit 530f4a1
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 25 deletions.
11 changes: 9 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/openmeter/debug"
"github.com/openmeterio/openmeter/openmeter/ingest"
"github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter"
"github.com/openmeterio/openmeter/openmeter/ingest/ingestdriver"
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest"
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer"
Expand Down Expand Up @@ -573,17 +574,23 @@ func initKafkaProducer(ctx context.Context, config config.Configuration, logger
return producer, nil
}

func initKafkaIngest(producer *kafka.Producer, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, serializer serializer.Serializer) (*kafkaingest.Collector, *kafkaingest.NamespaceHandler, error) {
func initKafkaIngest(producer *kafka.Producer, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, serializer serializer.Serializer) (ingest.Collector, *kafkaingest.NamespaceHandler, error) {
var collector ingest.Collector

collector, err := kafkaingest.NewCollector(
producer,
serializer,
config.Ingest.Kafka.EventsTopicTemplate,
metricMeter,
)
if err != nil {
return nil, nil, fmt.Errorf("init kafka ingest: %w", err)
}

collector, err = ingestadapter.WithMetrics(collector, metricMeter)
if err != nil {
return nil, nil, fmt.Errorf("init kafka ingest: %w", err)
}

kafkaAdminClient, err := kafka.NewAdminClientFromProducer(producer)
if err != nil {
return nil, nil, err
Expand Down
69 changes: 69 additions & 0 deletions openmeter/ingest/ingestadapter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package ingestadapter

import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/v2/event"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/openmeter/ingest"
)

// collectorMetrics emits metrics for ingested events.
type collectorMetrics struct {
collector ingest.Collector

ingestEventsCounter metric.Int64Counter
ingestErrorsCounter metric.Int64Counter
}

// WithMetrics wraps an [ingest.Collector] and emits metrics for ingested events.
func WithMetrics(collector ingest.Collector, metricMeter metric.Meter) (ingest.Collector, error) {
ingestEventsCounter, err := metricMeter.Int64Counter(
"ingest.events",
metric.WithDescription("Number of events ingested"),
metric.WithUnit("{event}"),
)
if err != nil {
return nil, fmt.Errorf("failed to create events counter: %w", err)
}

ingestErrorsCounter, err := metricMeter.Int64Counter(
"ingest.errors",
metric.WithDescription("Number of failed event ingests"),
metric.WithUnit("{error}"),
)
if err != nil {
return nil, fmt.Errorf("failed to create errors counter: %w", err)
}

return collectorMetrics{
collector: collector,

ingestEventsCounter: ingestEventsCounter,
ingestErrorsCounter: ingestErrorsCounter,
}, nil
}

// Ingest implements the [ingest.Collector] interface.
func (c collectorMetrics) Ingest(ctx context.Context, namespace string, ev event.Event) error {
namespaceAttr := attribute.String("namespace", namespace)

err := c.collector.Ingest(ctx, namespace, ev)
if err != nil {
c.ingestErrorsCounter.Add(ctx, 1, metric.WithAttributes(namespaceAttr))

return err
}

c.ingestEventsCounter.Add(ctx, 1, metric.WithAttributes(namespaceAttr))

return nil
}

// Close implements the [ingest.Collector] interface.
func (c collectorMetrics) Close() {
c.collector.Close()
}
23 changes: 0 additions & 23 deletions openmeter/ingest/kafkaingest/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/cloudevents/sdk-go/v2/event"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer"
kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics"
Expand All @@ -25,15 +23,12 @@ type Collector struct {
// NamespacedTopicTemplate needs to contain at least one string parameter passed to fmt.Sprintf.
// For example: "om_%s_events"
NamespacedTopicTemplate string

ingestEventCounter metric.Int64Counter
}

func NewCollector(
producer *kafka.Producer,
serializer serializer.Serializer,
namespacedTopicTemplate string,
metricMeter metric.Meter,
) (*Collector, error) {
if producer == nil {
return nil, fmt.Errorf("producer is required")
Expand All @@ -44,25 +39,11 @@ func NewCollector(
if namespacedTopicTemplate == "" {
return nil, fmt.Errorf("namespaced topic template is required")
}
if metricMeter == nil {
return nil, fmt.Errorf("metric meter is required")
}

// Initialize OTel metrics
ingestEventCounter, err := metricMeter.Int64Counter(
"ingest.events",
metric.WithDescription("The number of events ingested"),
metric.WithUnit("{event}"),
)
if err != nil {
return nil, fmt.Errorf("failed to create events counter: %w", err)
}

return &Collector{
Producer: producer,
Serializer: serializer,
NamespacedTopicTemplate: namespacedTopicTemplate,
ingestEventCounter: ingestEventCounter,
}, nil
}

Expand Down Expand Up @@ -96,10 +77,6 @@ func (s Collector) Ingest(ctx context.Context, namespace string, ev event.Event)
return fmt.Errorf("producing kafka message: %w", err)
}

// Increment the ingest event counter metric
namespaceAttr := attribute.String("namespace", namespace)
s.ingestEventCounter.Add(ctx, 1, metric.WithAttributes(namespaceAttr))

return nil
}

Expand Down

0 comments on commit 530f4a1

Please sign in to comment.