diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index 409a53f6d..efcd6afb3 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -267,12 +267,12 @@ func main() { Logger: logger, } - if conf.BalanceWorker.PoisionQueue.Enabled { - workerOptions.PoisonQueue = &balanceworker.WorkerPoisonQueueOptions{ - Topic: conf.BalanceWorker.PoisionQueue.Topic, - Throttle: conf.BalanceWorker.PoisionQueue.Throttle.Enabled, - ThrottleDuration: conf.BalanceWorker.PoisionQueue.Throttle.Duration, - ThrottleCount: conf.BalanceWorker.PoisionQueue.Throttle.Count, + if conf.BalanceWorker.DLQ.Enabled { + workerOptions.DLQ = &balanceworker.WorkerDLQOptions{ + Topic: conf.BalanceWorker.DLQ.Topic, + Throttle: conf.BalanceWorker.DLQ.Throttle.Enabled, + ThrottleDuration: conf.BalanceWorker.DLQ.Throttle.Duration, + ThrottleCount: conf.BalanceWorker.DLQ.Throttle.Count, } } @@ -361,7 +361,7 @@ type eventPublishers struct { func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublishers, error) { eventDriver := watermillkafka.NewPublisher(kafkaProducer) - if conf.BalanceWorker.PoisionQueue.AutoProvision.Enabled { + if conf.BalanceWorker.DLQ.AutoProvision.Enabled { adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer) if err != nil { return nil, fmt.Errorf("failed to create Kafka admin client: %w", err) @@ -372,8 +372,8 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co if err := pkgkafka.ProvisionTopic(ctx, adminClient, logger, - conf.BalanceWorker.PoisionQueue.Topic, - conf.BalanceWorker.PoisionQueue.AutoProvision.Partitions); err != nil { + conf.BalanceWorker.DLQ.Topic, + conf.BalanceWorker.DLQ.AutoProvision.Partitions); err != nil { return nil, fmt.Errorf("failed to auto provision topic: %w", err) } } diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index e4b66465b..81b25bd01 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -264,12 +264,12 @@ func main() { Logger: logger, } - if conf.NotificationService.Consumer.PoisionQueue.Enabled { - consumerOptions.PoisonQueue = &consumer.PoisonQueueOptions{ - Topic: conf.NotificationService.Consumer.PoisionQueue.Topic, - Throttle: conf.NotificationService.Consumer.PoisionQueue.Throttle.Enabled, - ThrottleDuration: conf.NotificationService.Consumer.PoisionQueue.Throttle.Duration, - ThrottleCount: conf.NotificationService.Consumer.PoisionQueue.Throttle.Count, + if conf.NotificationService.Consumer.DLQ.Enabled { + consumerOptions.DLQ = &consumer.DLQOptions{ + Topic: conf.NotificationService.Consumer.DLQ.Topic, + Throttle: conf.NotificationService.Consumer.DLQ.Throttle.Enabled, + ThrottleDuration: conf.NotificationService.Consumer.DLQ.Throttle.Duration, + ThrottleCount: conf.NotificationService.Consumer.DLQ.Throttle.Count, } } @@ -358,7 +358,7 @@ type eventPublishers struct { func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublishers, error) { eventDriver := watermillkafka.NewPublisher(kafkaProducer) - if conf.NotificationService.Consumer.PoisionQueue.AutoProvision.Enabled { + if conf.NotificationService.Consumer.DLQ.AutoProvision.Enabled { adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer) if err != nil { return nil, fmt.Errorf("failed to create Kafka admin client: %w", err) @@ -369,8 +369,8 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co if err := pkgkafka.ProvisionTopic(ctx, adminClient, logger, - conf.NotificationService.Consumer.PoisionQueue.Topic, - conf.NotificationService.Consumer.PoisionQueue.AutoProvision.Partitions); err != nil { + conf.NotificationService.Consumer.DLQ.Topic, + conf.NotificationService.Consumer.DLQ.AutoProvision.Partitions); err != nil { return nil, fmt.Errorf("failed to auto provision topic: %w", err) } } diff --git a/config/balanceworker.go b/config/balanceworker.go index 0ccf6ef47..c4bee9b01 100644 --- a/config/balanceworker.go +++ b/config/balanceworker.go @@ -9,13 +9,13 @@ import ( ) type BalanceWorkerConfiguration struct { - PoisionQueue PoisionQueueConfiguration + DLQ DLQConfiguration Retry RetryConfiguration ConsumerGroupName string } func (c BalanceWorkerConfiguration) Validate() error { - if err := c.PoisionQueue.Validate(); err != nil { + if err := c.DLQ.Validate(); err != nil { return fmt.Errorf("poision queue: %w", err) } @@ -30,15 +30,15 @@ func (c BalanceWorkerConfiguration) Validate() error { } func ConfigureBalanceWorker(v *viper.Viper) { - v.SetDefault("balanceWorker.poisionQueue.enabled", true) - v.SetDefault("balanceWorker.poisionQueue.topic", "om_sys.balance_worker_poision") - v.SetDefault("balanceWorker.poisionQueue.autoProvision.enabled", true) - v.SetDefault("balanceWorker.poisionQueue.autoProvision.partitions", 1) + v.SetDefault("balanceWorker.dlq.enabled", true) + v.SetDefault("balanceWorker.dlq.topic", "om_sys.balance_worker_dlq") + v.SetDefault("balanceWorker.dlq.autoProvision.enabled", true) + v.SetDefault("balanceWorker.dlq.autoProvision.partitions", 1) - v.SetDefault("balanceWorker.poisionQueue.throttle.enabled", true) + v.SetDefault("balanceWorker.dlq.throttle.enabled", true) // Let's throttle poision queue to 10 messages per second - v.SetDefault("balanceWorker.poisionQueue.throttle.count", 10) - v.SetDefault("balanceWorker.poisionQueue.throttle.duration", time.Second) + v.SetDefault("balanceWorker.dlq.throttle.count", 10) + v.SetDefault("balanceWorker.dlq.throttle.duration", time.Second) v.SetDefault("balanceWorker.retry.maxRetries", 5) v.SetDefault("balanceWorker.retry.initialInterval", 100*time.Millisecond) diff --git a/config/config_test.go b/config/config_test.go index 38db0c2a5..094db026a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -193,9 +193,9 @@ func TestComplete(t *testing.T) { }, }, BalanceWorker: BalanceWorkerConfiguration{ - PoisionQueue: PoisionQueueConfiguration{ + DLQ: DLQConfiguration{ Enabled: true, - Topic: "om_sys.balance_worker_poision", + Topic: "om_sys.balance_worker_dlq", AutoProvision: AutoProvisionConfiguration{ Enabled: true, Partitions: 1, @@ -214,9 +214,9 @@ func TestComplete(t *testing.T) { }, NotificationService: NotificationServiceConfiguration{ Consumer: NotificationServiceConsumerConfiguration{ - PoisionQueue: PoisionQueueConfiguration{ + DLQ: DLQConfiguration{ Enabled: true, - Topic: "om_sys.notification_service_poision", + Topic: "om_sys.notification_service_dlq", AutoProvision: AutoProvisionConfiguration{ Enabled: true, Partitions: 1, diff --git a/config/events.go b/config/events.go index 7d48a4284..dfd252753 100644 --- a/config/events.go +++ b/config/events.go @@ -48,14 +48,14 @@ func (c AutoProvisionConfiguration) Validate() error { return nil } -type PoisionQueueConfiguration struct { +type DLQConfiguration struct { Enabled bool Topic string AutoProvision AutoProvisionConfiguration Throttle ThrottleConfiguration } -func (c PoisionQueueConfiguration) Validate() error { +func (c DLQConfiguration) Validate() error { if !c.Enabled { return nil } diff --git a/config/notificationservice.go b/config/notificationservice.go index 24a2b29c0..e4f604f69 100644 --- a/config/notificationservice.go +++ b/config/notificationservice.go @@ -13,7 +13,7 @@ type NotificationServiceConfiguration struct { } type NotificationServiceConsumerConfiguration struct { - PoisionQueue PoisionQueueConfiguration + DLQ DLQConfiguration Retry RetryConfiguration ConsumerGroupName string } @@ -26,7 +26,7 @@ func (c NotificationServiceConfiguration) Validate() error { } func (c NotificationServiceConsumerConfiguration) Validate() error { - if err := c.PoisionQueue.Validate(); err != nil { + if err := c.DLQ.Validate(); err != nil { return fmt.Errorf("poision queue: %w", err) } @@ -41,15 +41,15 @@ func (c NotificationServiceConsumerConfiguration) Validate() error { } func ConfigureNotificationService(v *viper.Viper) { - v.SetDefault("notificationService.consumer.poisionQueue.enabled", true) - v.SetDefault("notificationService.consumer.poisionQueue.topic", "om_sys.notification_service_poision") - v.SetDefault("notificationService.consumer.poisionQueue.autoProvision.enabled", true) - v.SetDefault("notificationService.consumer.poisionQueue.autoProvision.partitions", 1) + v.SetDefault("notificationService.consumer.dlq.enabled", true) + v.SetDefault("notificationService.consumer.dlq.topic", "om_sys.notification_service_dlq") + v.SetDefault("notificationService.consumer.dlq.autoProvision.enabled", true) + v.SetDefault("notificationService.consumer.dlq.autoProvision.partitions", 1) - v.SetDefault("notificationService.consumer.poisionQueue.throttle.enabled", true) + v.SetDefault("notificationService.consumer.dlq.throttle.enabled", true) // Let's throttle poision queue to 10 messages per second - v.SetDefault("notificationService.consumer.poisionQueue.throttle.count", 10) - v.SetDefault("notificationService.consumer.poisionQueue.throttle.duration", time.Second) + v.SetDefault("notificationService.consumer.dlq.throttle.count", 10) + v.SetDefault("notificationService.consumer.dlq.throttle.duration", time.Second) v.SetDefault("notificationService.consumer.retry.maxRetries", 5) v.SetDefault("notificationService.consumer.retry.initialInterval", 100*time.Millisecond) diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index 0bcdacebc..8d099f930 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -42,7 +42,7 @@ type WorkerOptions struct { Subscriber message.Subscriber TargetTopic string - PoisonQueue *WorkerPoisonQueueOptions + DLQ *WorkerDLQOptions Publisher message.Publisher Marshaler publisher.CloudEventMarshaler @@ -53,7 +53,7 @@ type WorkerOptions struct { Logger *slog.Logger } -type WorkerPoisonQueueOptions struct { +type WorkerDLQOptions struct { Topic string Throttle bool ThrottleDuration time.Duration @@ -121,8 +121,8 @@ func New(opts WorkerOptions) (*Worker, error) { middleware.Recoverer, ) - if opts.PoisonQueue != nil { - poisionQueue, err := middleware.PoisonQueue(opts.Publisher, opts.PoisonQueue.Topic) + if opts.DLQ != nil { + poisionQueue, err := middleware.PoisonQueue(opts.Publisher, opts.DLQ.Topic) if err != nil { return nil, err } @@ -132,15 +132,15 @@ func New(opts WorkerOptions) (*Worker, error) { ) poisionQueueProcessor := worker.handleEvent - if opts.PoisonQueue.Throttle { + if opts.DLQ.Throttle { poisionQueueProcessor = middleware.NewThrottle( - opts.PoisonQueue.ThrottleCount, - opts.PoisonQueue.ThrottleDuration, + opts.DLQ.ThrottleCount, + opts.DLQ.ThrottleDuration, ).Middleware(poisionQueueProcessor) } router.AddHandler( "balance_worker_process_poison_queue", - opts.PoisonQueue.Topic, + opts.DLQ.Topic, opts.Subscriber, opts.TargetTopic, opts.Publisher, diff --git a/internal/notification/consumer/consumer.go b/internal/notification/consumer/consumer.go index 493c18045..65d19f37d 100644 --- a/internal/notification/consumer/consumer.go +++ b/internal/notification/consumer/consumer.go @@ -23,14 +23,14 @@ type Options struct { Publisher message.Publisher - PoisonQueue *PoisonQueueOptions + DLQ *DLQOptions Entitlement *registry.Entitlement Logger *slog.Logger } -type PoisonQueueOptions struct { +type DLQOptions struct { Topic string Throttle bool ThrottleDuration time.Duration @@ -71,8 +71,8 @@ func New(opts Options) (*Consumer, error) { middleware.Recoverer, ) - if opts.PoisonQueue != nil { - poisionQueue, err := middleware.PoisonQueue(opts.Publisher, opts.PoisonQueue.Topic) + if opts.DLQ != nil { + poisionQueue, err := middleware.PoisonQueue(opts.Publisher, opts.DLQ.Topic) if err != nil { return nil, err } @@ -82,15 +82,15 @@ func New(opts Options) (*Consumer, error) { ) poisionQueueProcessor := nopublisher.NoPublisherHandlerToHandlerFunc(consumer.handleSystemEvent) - if opts.PoisonQueue.Throttle { + if opts.DLQ.Throttle { poisionQueueProcessor = middleware.NewThrottle( - opts.PoisonQueue.ThrottleCount, - opts.PoisonQueue.ThrottleDuration, + opts.DLQ.ThrottleCount, + opts.DLQ.ThrottleDuration, ).Middleware(poisionQueueProcessor) } router.AddNoPublisherHandler( "balance_consumer_process_poison_queue", - opts.PoisonQueue.Topic, + opts.DLQ.Topic, opts.Subscriber, nopublisher.HandlerFuncToNoPublisherHandler(poisionQueueProcessor), )