Skip to content

Commit

Permalink
Merge pull request #1284 from openmeterio/refactor/rename-poision-queue
Browse files Browse the repository at this point in the history
refactor: rename poision queue to DLQ
  • Loading branch information
turip authored Aug 1, 2024
2 parents 41cbe74 + f00045d commit 38249ae
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 58 deletions.
18 changes: 9 additions & 9 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,12 @@ func main() {
Logger: logger,
}

if conf.BalanceWorker.PoisionQueue.Enabled {
workerOptions.PoisonQueue = &balanceworker.WorkerPoisonQueueOptions{
Topic: conf.BalanceWorker.PoisionQueue.Topic,
Throttle: conf.BalanceWorker.PoisionQueue.Throttle.Enabled,
ThrottleDuration: conf.BalanceWorker.PoisionQueue.Throttle.Duration,
ThrottleCount: conf.BalanceWorker.PoisionQueue.Throttle.Count,
if conf.BalanceWorker.DLQ.Enabled {
workerOptions.DLQ = &balanceworker.WorkerDLQOptions{
Topic: conf.BalanceWorker.DLQ.Topic,
Throttle: conf.BalanceWorker.DLQ.Throttle.Enabled,
ThrottleDuration: conf.BalanceWorker.DLQ.Throttle.Duration,
ThrottleCount: conf.BalanceWorker.DLQ.Throttle.Count,
}
}

Expand Down Expand Up @@ -361,7 +361,7 @@ type eventPublishers struct {
func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublishers, error) {
eventDriver := watermillkafka.NewPublisher(kafkaProducer)

if conf.BalanceWorker.PoisionQueue.AutoProvision.Enabled {
if conf.BalanceWorker.DLQ.AutoProvision.Enabled {
adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka admin client: %w", err)
Expand All @@ -372,8 +372,8 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co
if err := pkgkafka.ProvisionTopic(ctx,
adminClient,
logger,
conf.BalanceWorker.PoisionQueue.Topic,
conf.BalanceWorker.PoisionQueue.AutoProvision.Partitions); err != nil {
conf.BalanceWorker.DLQ.Topic,
conf.BalanceWorker.DLQ.AutoProvision.Partitions); err != nil {
return nil, fmt.Errorf("failed to auto provision topic: %w", err)
}
}
Expand Down
18 changes: 9 additions & 9 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,12 @@ func main() {
Logger: logger,
}

if conf.NotificationService.Consumer.PoisionQueue.Enabled {
consumerOptions.PoisonQueue = &consumer.PoisonQueueOptions{
Topic: conf.NotificationService.Consumer.PoisionQueue.Topic,
Throttle: conf.NotificationService.Consumer.PoisionQueue.Throttle.Enabled,
ThrottleDuration: conf.NotificationService.Consumer.PoisionQueue.Throttle.Duration,
ThrottleCount: conf.NotificationService.Consumer.PoisionQueue.Throttle.Count,
if conf.NotificationService.Consumer.DLQ.Enabled {
consumerOptions.DLQ = &consumer.DLQOptions{
Topic: conf.NotificationService.Consumer.DLQ.Topic,
Throttle: conf.NotificationService.Consumer.DLQ.Throttle.Enabled,
ThrottleDuration: conf.NotificationService.Consumer.DLQ.Throttle.Duration,
ThrottleCount: conf.NotificationService.Consumer.DLQ.Throttle.Count,
}
}

Expand Down Expand Up @@ -358,7 +358,7 @@ type eventPublishers struct {
func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublishers, error) {
eventDriver := watermillkafka.NewPublisher(kafkaProducer)

if conf.NotificationService.Consumer.PoisionQueue.AutoProvision.Enabled {
if conf.NotificationService.Consumer.DLQ.AutoProvision.Enabled {
adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka admin client: %w", err)
Expand All @@ -369,8 +369,8 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co
if err := pkgkafka.ProvisionTopic(ctx,
adminClient,
logger,
conf.NotificationService.Consumer.PoisionQueue.Topic,
conf.NotificationService.Consumer.PoisionQueue.AutoProvision.Partitions); err != nil {
conf.NotificationService.Consumer.DLQ.Topic,
conf.NotificationService.Consumer.DLQ.AutoProvision.Partitions); err != nil {
return nil, fmt.Errorf("failed to auto provision topic: %w", err)
}
}
Expand Down
18 changes: 9 additions & 9 deletions config/balanceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
)

type BalanceWorkerConfiguration struct {
PoisionQueue PoisionQueueConfiguration
DLQ DLQConfiguration
Retry RetryConfiguration
ConsumerGroupName string
}

func (c BalanceWorkerConfiguration) Validate() error {
if err := c.PoisionQueue.Validate(); err != nil {
if err := c.DLQ.Validate(); err != nil {
return fmt.Errorf("poision queue: %w", err)
}

Expand All @@ -30,15 +30,15 @@ func (c BalanceWorkerConfiguration) Validate() error {
}

func ConfigureBalanceWorker(v *viper.Viper) {
v.SetDefault("balanceWorker.poisionQueue.enabled", true)
v.SetDefault("balanceWorker.poisionQueue.topic", "om_sys.balance_worker_poision")
v.SetDefault("balanceWorker.poisionQueue.autoProvision.enabled", true)
v.SetDefault("balanceWorker.poisionQueue.autoProvision.partitions", 1)
v.SetDefault("balanceWorker.dlq.enabled", true)
v.SetDefault("balanceWorker.dlq.topic", "om_sys.balance_worker_dlq")
v.SetDefault("balanceWorker.dlq.autoProvision.enabled", true)
v.SetDefault("balanceWorker.dlq.autoProvision.partitions", 1)

v.SetDefault("balanceWorker.poisionQueue.throttle.enabled", true)
v.SetDefault("balanceWorker.dlq.throttle.enabled", true)
// Let's throttle poision queue to 10 messages per second
v.SetDefault("balanceWorker.poisionQueue.throttle.count", 10)
v.SetDefault("balanceWorker.poisionQueue.throttle.duration", time.Second)
v.SetDefault("balanceWorker.dlq.throttle.count", 10)
v.SetDefault("balanceWorker.dlq.throttle.duration", time.Second)

v.SetDefault("balanceWorker.retry.maxRetries", 5)
v.SetDefault("balanceWorker.retry.initialInterval", 100*time.Millisecond)
Expand Down
8 changes: 4 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ func TestComplete(t *testing.T) {
},
},
BalanceWorker: BalanceWorkerConfiguration{
PoisionQueue: PoisionQueueConfiguration{
DLQ: DLQConfiguration{
Enabled: true,
Topic: "om_sys.balance_worker_poision",
Topic: "om_sys.balance_worker_dlq",
AutoProvision: AutoProvisionConfiguration{
Enabled: true,
Partitions: 1,
Expand All @@ -214,9 +214,9 @@ func TestComplete(t *testing.T) {
},
NotificationService: NotificationServiceConfiguration{
Consumer: NotificationServiceConsumerConfiguration{
PoisionQueue: PoisionQueueConfiguration{
DLQ: DLQConfiguration{
Enabled: true,
Topic: "om_sys.notification_service_poision",
Topic: "om_sys.notification_service_dlq",
AutoProvision: AutoProvisionConfiguration{
Enabled: true,
Partitions: 1,
Expand Down
4 changes: 2 additions & 2 deletions config/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func (c AutoProvisionConfiguration) Validate() error {
return nil
}

type PoisionQueueConfiguration struct {
type DLQConfiguration struct {
Enabled bool
Topic string
AutoProvision AutoProvisionConfiguration
Throttle ThrottleConfiguration
}

func (c PoisionQueueConfiguration) Validate() error {
func (c DLQConfiguration) Validate() error {
if !c.Enabled {
return nil
}
Expand Down
18 changes: 9 additions & 9 deletions config/notificationservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type NotificationServiceConfiguration struct {
}

type NotificationServiceConsumerConfiguration struct {
PoisionQueue PoisionQueueConfiguration
DLQ DLQConfiguration
Retry RetryConfiguration
ConsumerGroupName string
}
Expand All @@ -26,7 +26,7 @@ func (c NotificationServiceConfiguration) Validate() error {
}

func (c NotificationServiceConsumerConfiguration) Validate() error {
if err := c.PoisionQueue.Validate(); err != nil {
if err := c.DLQ.Validate(); err != nil {
return fmt.Errorf("poision queue: %w", err)
}

Expand All @@ -41,15 +41,15 @@ func (c NotificationServiceConsumerConfiguration) Validate() error {
}

func ConfigureNotificationService(v *viper.Viper) {
v.SetDefault("notificationService.consumer.poisionQueue.enabled", true)
v.SetDefault("notificationService.consumer.poisionQueue.topic", "om_sys.notification_service_poision")
v.SetDefault("notificationService.consumer.poisionQueue.autoProvision.enabled", true)
v.SetDefault("notificationService.consumer.poisionQueue.autoProvision.partitions", 1)
v.SetDefault("notificationService.consumer.dlq.enabled", true)
v.SetDefault("notificationService.consumer.dlq.topic", "om_sys.notification_service_dlq")
v.SetDefault("notificationService.consumer.dlq.autoProvision.enabled", true)
v.SetDefault("notificationService.consumer.dlq.autoProvision.partitions", 1)

v.SetDefault("notificationService.consumer.poisionQueue.throttle.enabled", true)
v.SetDefault("notificationService.consumer.dlq.throttle.enabled", true)
// Let's throttle poision queue to 10 messages per second
v.SetDefault("notificationService.consumer.poisionQueue.throttle.count", 10)
v.SetDefault("notificationService.consumer.poisionQueue.throttle.duration", time.Second)
v.SetDefault("notificationService.consumer.dlq.throttle.count", 10)
v.SetDefault("notificationService.consumer.dlq.throttle.duration", time.Second)

v.SetDefault("notificationService.consumer.retry.maxRetries", 5)
v.SetDefault("notificationService.consumer.retry.initialInterval", 100*time.Millisecond)
Expand Down
16 changes: 8 additions & 8 deletions internal/entitlement/balanceworker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type WorkerOptions struct {
Subscriber message.Subscriber

TargetTopic string
PoisonQueue *WorkerPoisonQueueOptions
DLQ *WorkerDLQOptions
Publisher message.Publisher
Marshaler publisher.CloudEventMarshaler

Expand All @@ -53,7 +53,7 @@ type WorkerOptions struct {
Logger *slog.Logger
}

type WorkerPoisonQueueOptions struct {
type WorkerDLQOptions struct {
Topic string
Throttle bool
ThrottleDuration time.Duration
Expand Down Expand Up @@ -121,8 +121,8 @@ func New(opts WorkerOptions) (*Worker, error) {
middleware.Recoverer,
)

if opts.PoisonQueue != nil {
poisionQueue, err := middleware.PoisonQueue(opts.Publisher, opts.PoisonQueue.Topic)
if opts.DLQ != nil {
poisionQueue, err := middleware.PoisonQueue(opts.Publisher, opts.DLQ.Topic)
if err != nil {
return nil, err
}
Expand All @@ -132,15 +132,15 @@ func New(opts WorkerOptions) (*Worker, error) {
)

poisionQueueProcessor := worker.handleEvent
if opts.PoisonQueue.Throttle {
if opts.DLQ.Throttle {
poisionQueueProcessor = middleware.NewThrottle(
opts.PoisonQueue.ThrottleCount,
opts.PoisonQueue.ThrottleDuration,
opts.DLQ.ThrottleCount,
opts.DLQ.ThrottleDuration,
).Middleware(poisionQueueProcessor)
}
router.AddHandler(
"balance_worker_process_poison_queue",
opts.PoisonQueue.Topic,
opts.DLQ.Topic,
opts.Subscriber,
opts.TargetTopic,
opts.Publisher,
Expand Down
16 changes: 8 additions & 8 deletions internal/notification/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ type Options struct {

Publisher message.Publisher

PoisonQueue *PoisonQueueOptions
DLQ *DLQOptions

Entitlement *registry.Entitlement

Logger *slog.Logger
}

type PoisonQueueOptions struct {
type DLQOptions struct {
Topic string
Throttle bool
ThrottleDuration time.Duration
Expand Down Expand Up @@ -71,8 +71,8 @@ func New(opts Options) (*Consumer, error) {
middleware.Recoverer,
)

if opts.PoisonQueue != nil {
poisionQueue, err := middleware.PoisonQueue(opts.Publisher, opts.PoisonQueue.Topic)
if opts.DLQ != nil {
poisionQueue, err := middleware.PoisonQueue(opts.Publisher, opts.DLQ.Topic)
if err != nil {
return nil, err
}
Expand All @@ -82,15 +82,15 @@ func New(opts Options) (*Consumer, error) {
)

poisionQueueProcessor := nopublisher.NoPublisherHandlerToHandlerFunc(consumer.handleSystemEvent)
if opts.PoisonQueue.Throttle {
if opts.DLQ.Throttle {
poisionQueueProcessor = middleware.NewThrottle(
opts.PoisonQueue.ThrottleCount,
opts.PoisonQueue.ThrottleDuration,
opts.DLQ.ThrottleCount,
opts.DLQ.ThrottleDuration,
).Middleware(poisionQueueProcessor)
}
router.AddNoPublisherHandler(
"balance_consumer_process_poison_queue",
opts.PoisonQueue.Topic,
opts.DLQ.Topic,
opts.Subscriber,
nopublisher.HandlerFuncToNoPublisherHandler(poisionQueueProcessor),
)
Expand Down

0 comments on commit 38249ae

Please sign in to comment.