diff --git a/.vscode/launch.json b/.vscode/launch.json index 71bc4d195..165fa23c2 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -41,6 +41,19 @@ "--telemetry-address", ":10002" ] + }, + { + "name": "Launch Notification Service", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/cmd/notification-service", + "args": [ + "--config", + "${workspaceFolder}/config.yaml", + "--telemetry-address", + ":10003" + ] } ] } diff --git a/Dockerfile b/Dockerfile index 00a79eb6d..5dfef4db1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -39,6 +39,10 @@ RUN xx-verify /usr/local/bin/openmeter-sink-worker RUN go build -ldflags "-linkmode external -extldflags \"-static\" -X main.version=${VERSION}" -tags musl -o /usr/local/bin/openmeter-balance-worker ./cmd/balance-worker RUN xx-verify /usr/local/bin/openmeter-balance-worker +# Build balance-worker binary +RUN go build -ldflags "-linkmode external -extldflags \"-static\" -X main.version=${VERSION}" -tags musl -o /usr/local/bin/openmeter-notification-service ./cmd/notification-service +RUN xx-verify /usr/local/bin/openmeter-notification-service + FROM gcr.io/distroless/base-debian11:latest@sha256:ac69aa622ea5dcbca0803ca877d47d069f51bd4282d5c96977e0390d7d256455 AS distroless @@ -47,6 +51,7 @@ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=builder /usr/local/bin/openmeter /usr/local/bin/ COPY --from=builder /usr/local/bin/openmeter-sink-worker /usr/local/bin/ COPY --from=builder /usr/local/bin/openmeter-balance-worker /usr/local/bin/ +COPY --from=builder /usr/local/bin/openmeter-notification-service /usr/local/bin/ COPY --from=builder /usr/local/src/openmeter/go.* /usr/local/src/openmeter/ CMD openmeter @@ -58,6 +63,7 @@ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=builder /usr/local/bin/openmeter /usr/local/bin/ COPY --from=builder /usr/local/bin/openmeter-sink-worker /usr/local/bin/ COPY --from=builder /usr/local/bin/openmeter-balance-worker /usr/local/bin/ +COPY --from=builder /usr/local/bin/openmeter-notification-service /usr/local/bin/ COPY --from=builder /usr/local/src/openmeter/go.* /usr/local/src/openmeter/ CMD openmeter @@ -71,6 +77,7 @@ SHELL ["/bin/bash", "-c"] COPY --from=builder /usr/local/bin/openmeter /usr/local/bin/ COPY --from=builder /usr/local/bin/openmeter-sink-worker /usr/local/bin/ COPY --from=builder /usr/local/bin/openmeter-balance-worker /usr/local/bin/ +COPY --from=builder /usr/local/bin/openmeter-notification-service /usr/local/bin/ COPY --from=builder /usr/local/src/openmeter/go.* /usr/local/src/openmeter/ CMD openmeter diff --git a/Makefile b/Makefile index e09af6b3e..bac56e2c6 100644 --- a/Makefile +++ b/Makefile @@ -43,6 +43,11 @@ build-balance-worker: ## Build balance-worker binary $(call print-target) go build -o build/balance-worker ./cmd/balance-worker +.PHONY: build-notification-service +build-notification-service: ## Build notification-service binary + $(call print-target) + go build -o build/notification-service ./cmd/notification-service + config.yaml: cp config.example.yaml config.yaml @@ -64,6 +69,12 @@ balance-worker: ## Run balance-worker $(call print-target) air -c ./cmd/balance-worker/.air.toml +.PHONY: notification-service +notification-service: ## Run notification-service + @ if [ config.yaml -ot config.example.yaml ]; then diff -u config.yaml config.example.yaml || (echo "!!! The configuration example changed. Please update your config.yaml file accordingly (or at least touch it). !!!" && false); fi + $(call print-target) + air -c ./cmd/notification-service/.air.toml + .PHONY: etoe etoe: ## Run e2e tests $(call print-target) diff --git a/cmd/notification-service/.air.toml b/cmd/notification-service/.air.toml new file mode 100644 index 000000000..f3c71a30e --- /dev/null +++ b/cmd/notification-service/.air.toml @@ -0,0 +1,44 @@ +root = "." +testdata_dir = "testdata" +tmp_dir = "tmp" + +[build] + args_bin = ["--config", "./config.yaml", "--telemetry-address", ":10002"] + bin = "./tmp/openmeter-notification-service" + cmd = "go build -o ./tmp/openmeter-notification-service ./cmd/notification-service" + delay = 0 + exclude_dir = ["assets", "ci", "deploy", "docs", "examples", "testdata", "quickstart", "tmp", "vendor", "api/client", "node_modules"] + exclude_file = [] + exclude_regex = ["_test.go"] + exclude_unchanged = false + follow_symlink = false + full_bin = "" + include_dir = [] + include_ext = ["go", "tpl", "tmpl", "html", "yml", "yaml", "sql", "json"] + include_file = [] + kill_delay = "0s" + log = "build-errors.log" + poll = false + poll_interval = 0 + rerun = false + rerun_delay = 500 + send_interrupt = true + stop_on_error = false + +[color] + app = "" + build = "yellow" + main = "magenta" + runner = "green" + watcher = "cyan" + +[log] + main_only = false + time = false + +[misc] + clean_on_exit = false + +[screen] + clear_on_rebuild = false + keep_scroll = true diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go new file mode 100644 index 000000000..e4b66465b --- /dev/null +++ b/cmd/notification-service/main.go @@ -0,0 +1,438 @@ +package main + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "log/slog" + "net/http" + "os" + "syscall" + "time" + + "entgo.io/ent/dialect/sql" + health "github.com/AppsFlyer/go-sundheit" + healthhttp "github.com/AppsFlyer/go-sundheit/http" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/IBM/sarama" + "github.com/ThreeDotsLabs/watermill" + wmkafka "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + "github.com/go-slog/otelslog" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus/promhttp" + slogmulti "github.com/samber/slog-multi" + "github.com/spf13/pflag" + "github.com/spf13/viper" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + + "github.com/openmeterio/openmeter/config" + "github.com/openmeterio/openmeter/internal/ent/db" + "github.com/openmeterio/openmeter/internal/event/publisher" + "github.com/openmeterio/openmeter/internal/ingest/kafkaingest" + "github.com/openmeterio/openmeter/internal/meter" + "github.com/openmeterio/openmeter/internal/notification/consumer" + "github.com/openmeterio/openmeter/internal/registry" + "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" + watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" + "github.com/openmeterio/openmeter/pkg/contextx" + "github.com/openmeterio/openmeter/pkg/framework/entutils" + "github.com/openmeterio/openmeter/pkg/framework/operation" + "github.com/openmeterio/openmeter/pkg/gosundheit" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" + kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" + "github.com/openmeterio/openmeter/pkg/models" + "github.com/openmeterio/openmeter/pkg/slicesx" +) + +const ( + defaultShutdownTimeout = 5 * time.Second + otelName = "openmeter.io/backend" +) + +func main() { + v, flags := viper.NewWithOptions(viper.WithDecodeHook(config.DecodeHook())), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError) + ctx := context.Background() + + config.SetViperDefaults(v, flags) + + flags.String("config", "", "Configuration file") + flags.Bool("version", false, "Show version information") + + _ = flags.Parse(os.Args[1:]) + + if v, _ := flags.GetBool("version"); v { + fmt.Printf("%s version %s (%s) built on %s\n", "Open Meter", version, revision, revisionDate) + + os.Exit(0) + } + + if c, _ := flags.GetString("config"); c != "" { + v.SetConfigFile(c) + } + + err := v.ReadInConfig() + if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) { + panic(err) + } + + var conf config.Configuration + err = v.Unmarshal(&conf) + if err != nil { + panic(err) + } + + err = conf.Validate() + if err != nil { + panic(err) + } + + extraResources, _ := resource.New( + context.Background(), + resource.WithContainer(), + resource.WithAttributes( + semconv.ServiceName("openmeter"), + semconv.ServiceVersion(version), + semconv.DeploymentEnvironment(conf.Environment), + ), + ) + res, _ := resource.Merge( + resource.Default(), + extraResources, + ) + + logger := slog.New(slogmulti.Pipe( + otelslog.NewHandler, + contextx.NewLogHandler, + operation.NewLogHandler, + ).Handler(conf.Telemetry.Log.NewHandler(os.Stdout))) + logger = otelslog.WithResource(logger, res) + + slog.SetDefault(logger) + + telemetryRouter := chi.NewRouter() + telemetryRouter.Mount("/debug", middleware.Profiler()) + + // Initialize OTel Metrics + otelMeterProvider, err := conf.Telemetry.Metrics.NewMeterProvider(ctx, res) + if err != nil { + logger.Error("failed to initialize OpenTelemetry Metrics provider", slog.String("error", err.Error())) + os.Exit(1) + } + defer func() { + // Use dedicated context with timeout for shutdown as parent context might be canceled + // by the time the execution reaches this stage. + ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) + defer cancel() + + if err := otelMeterProvider.Shutdown(ctx); err != nil { + logger.Error("shutting down meter provider", slog.String("error", err.Error())) + } + }() + otel.SetMeterProvider(otelMeterProvider) + metricMeter := otelMeterProvider.Meter(otelName) + + if conf.Telemetry.Metrics.Exporters.Prometheus.Enabled { + telemetryRouter.Handle("/metrics", promhttp.Handler()) + } + + // Initialize OTel Tracer + otelTracerProvider, err := conf.Telemetry.Trace.NewTracerProvider(ctx, res) + if err != nil { + logger.Error("failed to initialize OpenTelemetry Trace provider", slog.String("error", err.Error())) + os.Exit(1) + } + defer func() { + // Use dedicated context with timeout for shutdown as parent context might be canceled + // by the time the execution reaches this stage. + ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) + defer cancel() + + if err := otelTracerProvider.Shutdown(ctx); err != nil { + logger.Error("shutting down tracer provider", slog.String("error", err.Error())) + } + }() + + otel.SetTracerProvider(otelTracerProvider) + otel.SetTextMapPropagator(propagation.TraceContext{}) + + // Validate service prerequisites + + if !conf.Events.Enabled { + logger.Error("events are disabled, exiting") + os.Exit(1) + } + + // Configure health checker + healthChecker := health.New(health.WithCheckListeners(gosundheit.NewLogger(logger.With(slog.String("component", "healthcheck"))))) + { + handler := healthhttp.HandleHealthJSON(healthChecker) + telemetryRouter.Handle("/healthz", handler) + + // Kubernetes style health checks + telemetryRouter.HandleFunc("/healthz/live", func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte("ok")) + }) + telemetryRouter.Handle("/healthz/ready", handler) + } + + var group run.Group + + // Initialize the data sources (entitlements, productcatalog, etc.) + // Dependencies: meters + meterRepository := meter.NewInMemoryRepository(slicesx.Map(conf.Meters, func(meter *models.Meter) models.Meter { + return *meter + })) + + // Dependencies: clickhouse + clickHouseClient, err := clickhouse.Open(conf.Aggregation.ClickHouse.GetClientOptions()) + if err != nil { + logger.Error("failed to initialize clickhouse client", "error", err) + os.Exit(1) + } + + // Dependencies: streamingConnector + clickhouseStreamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{ + Logger: logger, + ClickHouse: clickHouseClient, + Database: conf.Aggregation.ClickHouse.Database, + Meters: meterRepository, + CreateOrReplaceMeter: conf.Aggregation.CreateOrReplaceMeter, + PopulateMeter: conf.Aggregation.PopulateMeter, + }) + if err != nil { + logger.Error("failed to initialize clickhouse aggregation", "error", err) + os.Exit(1) + } + + // Dependencies: postgresql + pgClients, err := initPGClients(conf.Postgres) + if err != nil { + logger.Error("failed to initialize postgres clients", "error", err) + os.Exit(1) + } + defer pgClients.driver.Close() + + logger.Info("Postgres clients initialized") + + // Create subscriber + wmSubscriber, err := initKafkaSubscriber(conf, logger) + if err != nil { + logger.Error("failed to initialize Kafka subscriber", slog.String("error", err.Error())) + os.Exit(1) + } + + // Create publisher + kafkaPublisher, err := initKafkaProducer(ctx, conf, logger, metricMeter, &group) + if err != nil { + logger.Error("failed to initialize Kafka producer", slog.String("error", err.Error())) + os.Exit(1) + } + + publishers, err := initEventPublisher(ctx, logger, conf, kafkaPublisher) + if err != nil { + logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) + os.Exit(1) + } + + // Dependencies: entitlement + entitlementConnectors := registry.GetEntitlementRegistry(registry.EntitlementOptions{ + DatabaseClient: pgClients.client, + StreamingConnector: clickhouseStreamingConnector, + MeterRepository: meterRepository, + Logger: logger, + Publisher: publishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic), + }) + + // Initialize consumer + consumerOptions := consumer.Options{ + SystemEventsTopic: conf.Events.SystemEvents.Topic, + Subscriber: wmSubscriber, + + Publisher: publishers.watermillPublisher, + + Entitlement: entitlementConnectors, + + Logger: logger, + } + + if conf.NotificationService.Consumer.PoisionQueue.Enabled { + consumerOptions.PoisonQueue = &consumer.PoisonQueueOptions{ + Topic: conf.NotificationService.Consumer.PoisionQueue.Topic, + Throttle: conf.NotificationService.Consumer.PoisionQueue.Throttle.Enabled, + ThrottleDuration: conf.NotificationService.Consumer.PoisionQueue.Throttle.Duration, + ThrottleCount: conf.NotificationService.Consumer.PoisionQueue.Throttle.Count, + } + } + + notifictionConsumer, err := consumer.New(consumerOptions) + if err != nil { + logger.Error("failed to initialize worker", slog.String("error", err.Error())) + os.Exit(1) + } + + // Run worker components + + // Telemetry server + server := &http.Server{ + Addr: conf.Telemetry.Address, + Handler: telemetryRouter, + } + defer server.Close() + + group.Add( + func() error { return server.ListenAndServe() }, + func(err error) { _ = server.Shutdown(ctx) }, + ) + + // Notification service consumer + group.Add( + func() error { return notifictionConsumer.Run(ctx) }, + func(err error) { _ = notifictionConsumer.Close() }, + ) + + // Handle shutdown + group.Add(run.SignalHandler(ctx, syscall.SIGINT, syscall.SIGTERM)) + + // Run the group + err = group.Run() + if e := (run.SignalError{}); errors.As(err, &e) { + slog.Info("received signal; shutting down", slog.String("signal", e.Signal.String())) + } else if !errors.Is(err, http.ErrServerClosed) { + logger.Error("application stopped due to error", slog.String("error", err.Error())) + } +} + +func initKafkaSubscriber(conf config.Configuration, logger *slog.Logger) (message.Subscriber, error) { + wmConfig := wmkafka.SubscriberConfig{ + Brokers: []string{conf.Ingest.Kafka.Broker}, + OverwriteSaramaConfig: sarama.NewConfig(), + ConsumerGroup: conf.NotificationService.Consumer.ConsumerGroupName, + ReconnectRetrySleep: 100 * time.Millisecond, + Unmarshaler: wmkafka.DefaultMarshaler{}, + } + + wmConfig.OverwriteSaramaConfig.Metadata.RefreshFrequency = conf.Ingest.Kafka.TopicMetadataRefreshInterval.Duration() + wmConfig.OverwriteSaramaConfig.ClientID = "openmeter/notification-service" + + switch conf.Ingest.Kafka.SecurityProtocol { + case "SASL_SSL": + wmConfig.OverwriteSaramaConfig.Net.SASL.Enable = true + wmConfig.OverwriteSaramaConfig.Net.SASL.User = conf.Ingest.Kafka.SaslUsername + wmConfig.OverwriteSaramaConfig.Net.SASL.Password = conf.Ingest.Kafka.SaslPassword + wmConfig.OverwriteSaramaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(conf.Ingest.Kafka.SecurityProtocol) + wmConfig.OverwriteSaramaConfig.Net.TLS.Enable = true + wmConfig.OverwriteSaramaConfig.Net.TLS.Config = &tls.Config{} + default: + } + + if err := wmConfig.Validate(); err != nil { + logger.Error("failed to validate Kafka subscriber configuration", slog.String("error", err.Error())) + return nil, err + } + + // Initialize Kafka subscriber + subscriber, err := wmkafka.NewSubscriber(wmConfig, watermill.NewSlogLogger(logger)) + if err != nil { + logger.Error("failed to initialize Kafka subscriber", slog.String("error", err.Error())) + return nil, err + } + + return subscriber, nil +} + +type eventPublishers struct { + watermillPublisher message.Publisher + marshaler publisher.CloudEventMarshaler + eventPublisher publisher.Publisher +} + +func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublishers, error) { + eventDriver := watermillkafka.NewPublisher(kafkaProducer) + + if conf.NotificationService.Consumer.PoisionQueue.AutoProvision.Enabled { + adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer) + if err != nil { + return nil, fmt.Errorf("failed to create Kafka admin client: %w", err) + } + + defer adminClient.Close() + + if err := pkgkafka.ProvisionTopic(ctx, + adminClient, + logger, + conf.NotificationService.Consumer.PoisionQueue.Topic, + conf.NotificationService.Consumer.PoisionQueue.AutoProvision.Partitions); err != nil { + return nil, fmt.Errorf("failed to auto provision topic: %w", err) + } + } + + eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ + Publisher: eventDriver, + Transform: watermillkafka.AddPartitionKeyFromSubject, + }) + if err != nil { + return nil, fmt.Errorf("failed to create event publisher: %w", err) + } + + return &eventPublishers{ + watermillPublisher: eventDriver, + marshaler: publisher.NewCloudEventMarshaler(watermillkafka.AddPartitionKeyFromSubject), + eventPublisher: eventPublisher, + }, nil +} + +func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) { + // Initialize Kafka Admin Client + kafkaConfig := config.Ingest.Kafka.CreateKafkaConfig() + + // Initialize Kafka Producer + producer, err := kafka.NewProducer(&kafkaConfig) + if err != nil { + return nil, fmt.Errorf("init kafka ingest: %w", err) + } + + // Initialize Kafka Client Statistics reporter + kafkaMetrics, err := kafkametrics.New(metricMeter) + if err != nil { + return nil, fmt.Errorf("failed to create Kafka client metrics: %w", err) + } + + group.Add(kafkaingest.KafkaProducerGroup(ctx, producer, logger, kafkaMetrics)) + + go pkgkafka.ConsumeLogChannel(producer, logger.WithGroup("kafka").WithGroup("producer")) + + slog.Debug("connected to Kafka") + return producer, nil +} + +type pgClients struct { + driver *sql.Driver + client *db.Client +} + +func initPGClients(config config.PostgresConfig) ( + *pgClients, + error, +) { + if err := config.Validate(); err != nil { + return nil, fmt.Errorf("invalid postgres config: %w", err) + } + driver, err := entutils.GetPGDriver(config.URL) + if err != nil { + return nil, fmt.Errorf("failed to init postgres driver: %w", err) + } + + return &pgClients{ + driver: driver, + client: db.NewClient(db.Driver(driver)), + }, nil +} diff --git a/cmd/notification-service/version.go b/cmd/notification-service/version.go new file mode 100644 index 000000000..24582ea13 --- /dev/null +++ b/cmd/notification-service/version.go @@ -0,0 +1,34 @@ +package main + +import "runtime/debug" + +// Provisioned by ldflags. +var version string + +//nolint:gochecknoglobals +var ( + revision string + revisionDate string +) + +//nolint:gochecknoinits,goconst +func init() { + if version == "" { + version = "unknown" + } + + buildInfo, _ := debug.ReadBuildInfo() + + revision = "unknown" + revisionDate = "unknown" + + for _, setting := range buildInfo.Settings { + if setting.Key == "vcs.revision" { + revision = setting.Value + } + + if setting.Key == "vcs.time" { + revisionDate = setting.Value + } + } +} diff --git a/config/config.go b/config/config.go index bb0518ece..6cf453e28 100644 --- a/config/config.go +++ b/config/config.go @@ -20,17 +20,18 @@ type Configuration struct { Telemetry TelemetryConfig - Aggregation AggregationConfiguration - Entitlements EntitlementsConfiguration - Dedupe DedupeConfiguration - Events EventsConfiguration - Ingest IngestConfiguration - Meters []*models.Meter - Namespace NamespaceConfiguration - Portal PortalConfiguration - Postgres PostgresConfig - Sink SinkConfiguration - BalanceWorker BalanceWorkerConfiguration + Aggregation AggregationConfiguration + Entitlements EntitlementsConfiguration + Dedupe DedupeConfiguration + Events EventsConfiguration + Ingest IngestConfiguration + Meters []*models.Meter + Namespace NamespaceConfiguration + Portal PortalConfiguration + Postgres PostgresConfig + Sink SinkConfiguration + BalanceWorker BalanceWorkerConfiguration + NotificationService NotificationServiceConfiguration } // Validate validates the configuration. @@ -93,6 +94,10 @@ func (c Configuration) Validate() error { return fmt.Errorf("balance worker: %w", err) } + if err := c.NotificationService.Validate(); err != nil { + return fmt.Errorf("notification service: %w", err) + } + return nil } @@ -126,4 +131,5 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) { ConfigurePortal(v) ConfigureEvents(v) ConfigureBalanceWorker(v) + ConfigureNotificationService(v) } diff --git a/config/config_test.go b/config/config_test.go index ae479ba73..38db0c2a5 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -212,6 +212,28 @@ func TestComplete(t *testing.T) { }, ConsumerGroupName: "om_balance_worker", }, + NotificationService: NotificationServiceConfiguration{ + Consumer: NotificationServiceConsumerConfiguration{ + PoisionQueue: PoisionQueueConfiguration{ + Enabled: true, + Topic: "om_sys.notification_service_poision", + AutoProvision: AutoProvisionConfiguration{ + Enabled: true, + Partitions: 1, + }, + Throttle: ThrottleConfiguration{ + Enabled: true, + Count: 10, + Duration: time.Second, + }, + }, + Retry: RetryConfiguration{ + MaxRetries: 5, + InitialInterval: 100 * time.Millisecond, + }, + ConsumerGroupName: "om_notification_service", + }, + }, } assert.Equal(t, expected, actual) diff --git a/config/notificationservice.go b/config/notificationservice.go new file mode 100644 index 000000000..24a2b29c0 --- /dev/null +++ b/config/notificationservice.go @@ -0,0 +1,58 @@ +package config + +import ( + "errors" + "fmt" + "time" + + "github.com/spf13/viper" +) + +type NotificationServiceConfiguration struct { + Consumer NotificationServiceConsumerConfiguration +} + +type NotificationServiceConsumerConfiguration struct { + PoisionQueue PoisionQueueConfiguration + Retry RetryConfiguration + ConsumerGroupName string +} + +func (c NotificationServiceConfiguration) Validate() error { + if err := c.Consumer.Validate(); err != nil { + return fmt.Errorf("consumer: %w", err) + } + return nil +} + +func (c NotificationServiceConsumerConfiguration) Validate() error { + if err := c.PoisionQueue.Validate(); err != nil { + return fmt.Errorf("poision queue: %w", err) + } + + if err := c.Retry.Validate(); err != nil { + return fmt.Errorf("retry: %w", err) + } + + if c.ConsumerGroupName == "" { + return errors.New("consumer group name is required") + } + return nil +} + +func ConfigureNotificationService(v *viper.Viper) { + v.SetDefault("notificationService.consumer.poisionQueue.enabled", true) + v.SetDefault("notificationService.consumer.poisionQueue.topic", "om_sys.notification_service_poision") + v.SetDefault("notificationService.consumer.poisionQueue.autoProvision.enabled", true) + v.SetDefault("notificationService.consumer.poisionQueue.autoProvision.partitions", 1) + + v.SetDefault("notificationService.consumer.poisionQueue.throttle.enabled", true) + // Let's throttle poision queue to 10 messages per second + v.SetDefault("notificationService.consumer.poisionQueue.throttle.count", 10) + v.SetDefault("notificationService.consumer.poisionQueue.throttle.duration", time.Second) + + v.SetDefault("notificationService.consumer.retry.maxRetries", 5) + v.SetDefault("notificationService.consumer.retry.initialInterval", 100*time.Millisecond) + + v.SetDefault("notificationService.consumer.consumerGroupName", "om_notification_service") +} diff --git a/internal/notification/consumer/consumer.go b/internal/notification/consumer/consumer.go new file mode 100644 index 000000000..493c18045 --- /dev/null +++ b/internal/notification/consumer/consumer.go @@ -0,0 +1,153 @@ +package consumer + +import ( + "context" + "encoding/json" + "log/slog" + "time" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" + + "github.com/openmeterio/openmeter/internal/entitlement/snapshot" + "github.com/openmeterio/openmeter/internal/event/publisher" + "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/internal/registry" + "github.com/openmeterio/openmeter/internal/watermill/nopublisher" +) + +type Options struct { + SystemEventsTopic string + Subscriber message.Subscriber + + Publisher message.Publisher + + PoisonQueue *PoisonQueueOptions + + Entitlement *registry.Entitlement + + Logger *slog.Logger +} + +type PoisonQueueOptions struct { + Topic string + Throttle bool + ThrottleDuration time.Duration + ThrottleCount int64 +} + +type Consumer struct { + opts Options + router *message.Router +} + +func New(opts Options) (*Consumer, error) { + router, err := message.NewRouter(message.RouterConfig{}, watermill.NewSlogLogger(opts.Logger)) + if err != nil { + return nil, err + } + + consumer := &Consumer{ + opts: opts, + } + + router.AddNoPublisherHandler( + "balance_consumer_system_events", + opts.SystemEventsTopic, + opts.Subscriber, + consumer.handleSystemEvent, + ) + + router.AddMiddleware( + middleware.CorrelationID, + + middleware.Retry{ + MaxRetries: 5, + InitialInterval: 100 * time.Millisecond, + Logger: watermill.NewSlogLogger(opts.Logger), + }.Middleware, + + middleware.Recoverer, + ) + + if opts.PoisonQueue != nil { + poisionQueue, err := middleware.PoisonQueue(opts.Publisher, opts.PoisonQueue.Topic) + if err != nil { + return nil, err + } + + router.AddMiddleware( + poisionQueue, + ) + + poisionQueueProcessor := nopublisher.NoPublisherHandlerToHandlerFunc(consumer.handleSystemEvent) + if opts.PoisonQueue.Throttle { + poisionQueueProcessor = middleware.NewThrottle( + opts.PoisonQueue.ThrottleCount, + opts.PoisonQueue.ThrottleDuration, + ).Middleware(poisionQueueProcessor) + } + router.AddNoPublisherHandler( + "balance_consumer_process_poison_queue", + opts.PoisonQueue.Topic, + opts.Subscriber, + nopublisher.HandlerFuncToNoPublisherHandler(poisionQueueProcessor), + ) + } + + return &Consumer{ + opts: opts, + router: router, + }, nil +} + +func (w *Consumer) Run(ctx context.Context) error { + return w.router.Run(ctx) +} + +func (w *Consumer) Close() error { + return w.router.Close() +} + +func (w *Consumer) handleSystemEvent(msg *message.Message) error { + w.opts.Logger.Debug("received system event", w.messageToLogFields(msg)...) + + ceType, found := msg.Metadata[publisher.CloudEventsHeaderType] + if !found { + w.opts.Logger.Warn("missing CloudEvents type, ignoring message") + return nil + } + + switch ceType { + case snapshot.SnapshotEvent{}.Spec().Type(): + event, err := spec.ParseCloudEventFromBytes[snapshot.SnapshotEvent](msg.Payload) + if err != nil { + w.opts.Logger.Error("failed to parse entitlement created event", w.messageToLogFields(msg)...) + return err + } + + return w.handleSnapshotEvent(msg.Context(), event.Payload) + } + return nil +} + +func (w *Consumer) handleSnapshotEvent(_ context.Context, payload snapshot.SnapshotEvent) error { + w.opts.Logger.Info("handling entitlement snapshot event", slog.String("entitlement_id", payload.Entitlement.ID)) + + return nil +} + +func (w *Consumer) messageToLogFields(msg *message.Message) []any { + out := make([]any, 0, 3) + out = append(out, slog.String("message_uuid", msg.UUID)) + out = append(out, slog.String("message_payload", string(msg.Payload))) + + meta, err := json.Marshal(msg.Metadata) + if err != nil { + return out + } + + out = append(out, slog.String("message_metadata", string(meta))) + return out +} diff --git a/internal/watermill/nopublisher/nopublisher.go b/internal/watermill/nopublisher/nopublisher.go new file mode 100644 index 000000000..2114c7c16 --- /dev/null +++ b/internal/watermill/nopublisher/nopublisher.go @@ -0,0 +1,25 @@ +package nopublisher + +import ( + "errors" + + "github.com/ThreeDotsLabs/watermill/message" +) + +var ErrMessagesProduced = errors.New("messages produced by no publisher handler") + +func NoPublisherHandlerToHandlerFunc(h message.NoPublishHandlerFunc) message.HandlerFunc { + return func(message *message.Message) ([]*message.Message, error) { + return nil, h(message) + } +} + +func HandlerFuncToNoPublisherHandler(h message.HandlerFunc) message.NoPublishHandlerFunc { + return func(message *message.Message) error { + outMessages, err := h(message) + if len(outMessages) > 0 { + return ErrMessagesProduced + } + return err + } +}