From 0b0781901853e76b3cc8dcb2df8d4ca73e7ed00a Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Tue, 13 Aug 2024 10:12:30 +0200 Subject: [PATCH 1/2] feat: simplify dlq setup Given that most of our consumers are implicity depending on the ordering of the messages, this patch drops the retry queues and adds a simple DLQ solution. --- cmd/balance-worker/main.go | 2 +- cmd/notification-service/main.go | 2 +- config/balanceworker.go | 33 +-- config/config_test.go | 50 ++-- config/events.go | 99 +++++-- config/notification.go | 38 +-- go.mod | 1 + internal/entitlement/balanceworker/worker.go | 5 +- internal/notification/consumer/consumer.go | 24 +- internal/watermill/router/context.go | 19 ++ internal/watermill/router/router.go | 80 +++--- internal/watermill/router/router_test.go | 261 +++++++++++++++++++ openmeter/watermill/router/router.go | 4 +- 13 files changed, 443 insertions(+), 175 deletions(-) create mode 100644 internal/watermill/router/context.go create mode 100644 internal/watermill/router/router_test.go diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index 729af91ec..aae4f466f 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -276,7 +276,7 @@ func main() { Publisher: eventPublisherDriver, Logger: logger, - DLQ: conf.BalanceWorker.DLQ, + Config: conf.BalanceWorker.ConsumerConfiguration, }, EventBus: eventPublisher, diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index ca5a61482..8230af36a 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -271,7 +271,7 @@ func main() { Publisher: eventPublisherDriver, Logger: logger, - DLQ: conf.Notification.Consumer.DLQ, + Config: conf.Notification.Consumer, }, Marshaler: eventPublisher.Marshaler(), diff --git a/config/balanceworker.go b/config/balanceworker.go index 4717f8548..c3508993d 100644 --- a/config/balanceworker.go +++ b/config/balanceworker.go @@ -1,48 +1,23 @@ package config import ( - "errors" - "fmt" - "time" - "github.com/spf13/viper" ) type BalanceWorkerConfiguration struct { - DLQ DLQConfiguration - Retry RetryConfiguration - ConsumerGroupName string + ConsumerConfiguration `mapstructure:",squash"` } func (c BalanceWorkerConfiguration) Validate() error { - if err := c.DLQ.Validate(); err != nil { - return fmt.Errorf("poision queue: %w", err) - } - - if err := c.Retry.Validate(); err != nil { - return fmt.Errorf("retry: %w", err) - } - - if c.ConsumerGroupName == "" { - return errors.New("consumer group name is required") + if err := c.ConsumerConfiguration.Validate(); err != nil { + return err } return nil } func ConfigureBalanceWorker(v *viper.Viper) { - v.SetDefault("balanceWorker.dlq.enabled", true) + ConfigureConsumer(v, "balanceWorker") v.SetDefault("balanceWorker.dlq.topic", "om_sys.balance_worker_dlq") - v.SetDefault("balanceWorker.dlq.autoProvision.enabled", true) - v.SetDefault("balanceWorker.dlq.autoProvision.partitions", 1) - - v.SetDefault("balanceWorker.dlq.throttle.enabled", true) - // Let's throttle poision queue to 10 messages per second - v.SetDefault("balanceWorker.dlq.throttle.count", 10) - v.SetDefault("balanceWorker.dlq.throttle.duration", time.Second) - - v.SetDefault("balanceWorker.retry.maxRetries", 5) - v.SetDefault("balanceWorker.retry.initialInterval", 100*time.Millisecond) - v.SetDefault("balanceWorker.consumerGroupName", "om_balance_worker") } diff --git a/config/config_test.go b/config/config_test.go index d0026dd4d..d55e023ee 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -197,44 +197,42 @@ func TestComplete(t *testing.T) { }, }, BalanceWorker: BalanceWorkerConfiguration{ - DLQ: DLQConfiguration{ - Enabled: true, - Topic: "om_sys.balance_worker_dlq", - AutoProvision: AutoProvisionConfiguration{ - Enabled: true, - Partitions: 1, + ConsumerConfiguration: ConsumerConfiguration{ + ProcessingTimeout: 30 * time.Second, + Retry: RetryConfiguration{ + InitialInterval: 10 * time.Millisecond, + MaxInterval: time.Second, + MaxElapsedTime: time.Minute, }, - Throttle: ThrottleConfiguration{ - Enabled: true, - Count: 10, - Duration: time.Second, + DLQ: DLQConfiguration{ + Enabled: true, + Topic: "om_sys.balance_worker_dlq", + AutoProvision: DLQAutoProvisionConfiguration{ + Enabled: true, + Partitions: 1, + Retention: 90 * 24 * time.Hour, + }, }, + ConsumerGroupName: "om_balance_worker", }, - Retry: RetryConfiguration{ - MaxRetries: 5, - InitialInterval: 100 * time.Millisecond, - }, - ConsumerGroupName: "om_balance_worker", }, Notification: NotificationConfiguration{ Enabled: true, - Consumer: NotificationConsumerConfiguration{ + Consumer: ConsumerConfiguration{ + ProcessingTimeout: 30 * time.Second, + Retry: RetryConfiguration{ + InitialInterval: 10 * time.Millisecond, + MaxInterval: time.Second, + MaxElapsedTime: time.Minute, + }, DLQ: DLQConfiguration{ Enabled: true, Topic: "om_sys.notification_service_dlq", - AutoProvision: AutoProvisionConfiguration{ + AutoProvision: DLQAutoProvisionConfiguration{ Enabled: true, Partitions: 1, + Retention: 90 * 24 * time.Hour, }, - Throttle: ThrottleConfiguration{ - Enabled: true, - Count: 10, - Duration: time.Second, - }, - }, - Retry: RetryConfiguration{ - MaxRetries: 5, - InitialInterval: 100 * time.Millisecond, }, ConsumerGroupName: "om_notification_service", }, diff --git a/config/events.go b/config/events.go index dfd252753..77d3be8a1 100644 --- a/config/events.go +++ b/config/events.go @@ -37,8 +37,9 @@ func (c EventSubsystemConfiguration) Validate() error { } type AutoProvisionConfiguration struct { - Enabled bool - Partitions int + Enabled bool + Partitions int + DLQRetention time.Duration } func (c AutoProvisionConfiguration) Validate() error { @@ -48,11 +49,44 @@ func (c AutoProvisionConfiguration) Validate() error { return nil } +type ConsumerConfiguration struct { + // ProcessingTimeout is the maximum time a message is allowed to be processed before it is considered failed. 0 disables the timeout. + ProcessingTimeout time.Duration + + // Retry specifies how many times a message should be retried before it is sent to the DLQ. + Retry RetryConfiguration + + // ConsumerGroupName is the name of the consumer group that the consumer belongs to. + ConsumerGroupName string + + // DLQ specifies the configuration for the Dead Letter Queue. + DLQ DLQConfiguration +} + +func (c ConsumerConfiguration) Validate() error { + if c.ProcessingTimeout < 0 { + return errors.New("processing timeout must be positive or 0") + } + + if c.ConsumerGroupName == "" { + return errors.New("consumer group name is required") + } + + if err := c.Retry.Validate(); err != nil { + return fmt.Errorf("retry configuration is invalid: %w", err) + } + + if err := c.DLQ.Validate(); err != nil { + return fmt.Errorf("dlq configuration is invalid: %w", err) + } + + return nil +} + type DLQConfiguration struct { Enabled bool Topic string - AutoProvision AutoProvisionConfiguration - Throttle ThrottleConfiguration + AutoProvision DLQAutoProvisionConfiguration } func (c DLQConfiguration) Validate() error { @@ -64,52 +98,79 @@ func (c DLQConfiguration) Validate() error { return errors.New("topic name is required") } - if err := c.Throttle.Validate(); err != nil { - return fmt.Errorf("throttle: %w", err) + if err := c.AutoProvision.Validate(); err != nil { + return fmt.Errorf("auto provision configuration is invalid: %w", err) } return nil } -type ThrottleConfiguration struct { - Enabled bool - Count int64 - Duration time.Duration +type DLQAutoProvisionConfiguration struct { + Enabled bool + Partitions int + Retention time.Duration } -func (c ThrottleConfiguration) Validate() error { +func (c DLQAutoProvisionConfiguration) Validate() error { if !c.Enabled { return nil } - if c.Count <= 0 { - return errors.New("count must be greater than 0") + if c.Partitions < 1 { + return errors.New("partitions must be greater than 0") } - if c.Duration <= 0 { - return errors.New("duration must be greater than 0") + if c.Retention <= 0 { + return errors.New("retention must be greater than 0") } - return nil } type RetryConfiguration struct { - MaxRetries int + // MaxRetries is maximum number of times a retry will be attempted. Disabled if 0 + MaxRetries int + // InitialInterval is the first interval between retries. Subsequent intervals will be scaled by Multiplier. InitialInterval time.Duration + // MaxInterval sets the limit for the exponential backoff of retries. The interval will not be increased beyond MaxInterval. + MaxInterval time.Duration + // MaxElapsedTime sets the time limit of how long retries will be attempted. Disabled if 0. + MaxElapsedTime time.Duration } func (c RetryConfiguration) Validate() error { - if c.MaxRetries <= 0 { - return errors.New("max retries must be greater than 0") + if c.MaxRetries < 0 { + return errors.New("max retries must be positive or 0") + } + + if c.MaxElapsedTime < 0 { + return errors.New("max elapsed time must be positive or 0") } if c.InitialInterval <= 0 { return errors.New("initial interval must be greater than 0") } + if c.MaxInterval <= 0 { + return errors.New("max interval must be greater than 0") + } + return nil } +func ConfigureConsumer(v *viper.Viper, prefix string) { + v.SetDefault(prefix+".processingTimeout", 30*time.Second) + + v.SetDefault(prefix+".retry.maxRetries", 0) + v.SetDefault(prefix+".retry.initialInterval", 10*time.Millisecond) + v.SetDefault(prefix+".retry.maxInterval", time.Second) + v.SetDefault(prefix+".retry.maxElapsedTime", time.Minute) + + v.SetDefault(prefix+".dlq.enabled", true) + v.SetDefault(prefix+".dlq.autoProvision.enabled", true) + v.SetDefault(prefix+".dlq.autoProvision.partitions", 1) + v.SetDefault(prefix+".dlq.autoProvision.retention", 90*24*time.Hour) +} + func ConfigureEvents(v *viper.Viper) { // TODO: after the system events are fully implemented, we should enable them by default v.SetDefault("events.enabled", false) diff --git a/config/notification.go b/config/notification.go index fa53faec4..b9e265a1c 100644 --- a/config/notification.go +++ b/config/notification.go @@ -1,16 +1,14 @@ package config import ( - "errors" "fmt" - "time" "github.com/spf13/viper" ) type NotificationConfiguration struct { Enabled bool - Consumer NotificationConsumerConfiguration + Consumer ConsumerConfiguration } func (c NotificationConfiguration) Validate() error { @@ -20,40 +18,8 @@ func (c NotificationConfiguration) Validate() error { return nil } -type NotificationConsumerConfiguration struct { - DLQ DLQConfiguration - Retry RetryConfiguration - ConsumerGroupName string -} - -func (c NotificationConsumerConfiguration) Validate() error { - if err := c.DLQ.Validate(); err != nil { - return fmt.Errorf("poision queue: %w", err) - } - - if err := c.Retry.Validate(); err != nil { - return fmt.Errorf("retry: %w", err) - } - - if c.ConsumerGroupName == "" { - return errors.New("consumer group name is required") - } - return nil -} - func ConfigureNotification(v *viper.Viper) { - v.SetDefault("notification.consumer.dlq.enabled", true) + ConfigureConsumer(v, "notification.consumer") v.SetDefault("notification.consumer.dlq.topic", "om_sys.notification_service_dlq") - v.SetDefault("notification.consumer.dlq.autoProvision.enabled", true) - v.SetDefault("notification.consumer.dlq.autoProvision.partitions", 1) - - v.SetDefault("notification.consumer.dlq.throttle.enabled", true) - // Let's throttle poison queue to 10 messages per second - v.SetDefault("notification.consumer.dlq.throttle.count", 10) - v.SetDefault("notification.consumer.dlq.throttle.duration", time.Second) - - v.SetDefault("notification.consumer.retry.maxRetries", 5) - v.SetDefault("notification.consumer.retry.initialInterval", 100*time.Millisecond) - v.SetDefault("notification.consumer.consumerGroupName", "om_notification_service") } diff --git a/go.mod b/go.mod index a3ced63b2..371415040 100644 --- a/go.mod +++ b/go.mod @@ -355,6 +355,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/tetratelabs/wazero v1.6.0 // indirect github.com/tilinna/z85 v1.0.0 // indirect diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index 18cbcd394..c767634d7 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -78,15 +78,14 @@ func New(opts WorkerOptions) (*Worker, error) { highWatermarkCache: highWatermarkCache, } - eventHandler := worker.eventHandler() - - router, err := router.NewDefaultRouter(opts.Router, eventHandler) + router, err := router.NewDefaultRouter(opts.Router) if err != nil { return nil, err } worker.router = router + eventHandler := worker.eventHandler() router.AddNoPublisherHandler( "balance_worker_system_events", opts.SystemEventsTopic, diff --git a/internal/notification/consumer/consumer.go b/internal/notification/consumer/consumer.go index f0650378d..ff07b56ed 100644 --- a/internal/notification/consumer/consumer.go +++ b/internal/notification/consumer/consumer.go @@ -34,26 +34,24 @@ func New(opts Options) (*Consumer, error) { opts: opts, } - handler := grouphandler.NewNoPublishingHandler(opts.Marshaler, - grouphandler.NewGroupEventHandler(func(ctx context.Context, event *snapshot.SnapshotEvent) error { - if event == nil { - return nil - } - - return consumer.handleSnapshotEvent(ctx, *event) - }), - ) - - router, err := router.NewDefaultRouter(opts.Router, handler) + router, err := router.NewDefaultRouter(opts.Router) if err != nil { return nil, err } - router.AddNoPublisherHandler( + _ = router.AddNoPublisherHandler( "balance_consumer_system_events", opts.SystemEventsTopic, opts.Router.Subscriber, - handler, + grouphandler.NewNoPublishingHandler(opts.Marshaler, + grouphandler.NewGroupEventHandler(func(ctx context.Context, event *snapshot.SnapshotEvent) error { + if event == nil { + return nil + } + + return consumer.handleSnapshotEvent(ctx, *event) + }), + ), ) return &Consumer{ diff --git a/internal/watermill/router/context.go b/internal/watermill/router/context.go new file mode 100644 index 000000000..23a4869f4 --- /dev/null +++ b/internal/watermill/router/context.go @@ -0,0 +1,19 @@ +package router + +import ( + "github.com/ThreeDotsLabs/watermill/message" +) + +// RestoreContext ensures that the original context is restored after the handler is done processing the message. +// +// This helps with https://github.com/ThreeDotsLabs/watermill/issues/467 +func RestoreContext(h message.HandlerFunc) message.HandlerFunc { + return func(msg *message.Message) ([]*message.Message, error) { + origCtx := msg.Context() + defer func() { + msg.SetContext(origCtx) + }() + + return h(msg) + } +} diff --git a/internal/watermill/router/router.go b/internal/watermill/router/router.go index 4796e3338..8acec622e 100644 --- a/internal/watermill/router/router.go +++ b/internal/watermill/router/router.go @@ -3,14 +3,12 @@ package router import ( "errors" "log/slog" - "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/openmeterio/openmeter/config" - "github.com/openmeterio/openmeter/internal/watermill/nopublisher" ) type Options struct { @@ -18,7 +16,7 @@ type Options struct { Publisher message.Publisher Logger *slog.Logger - DLQ config.DLQConfiguration + Config config.ConsumerConfiguration } func (o *Options) Validate() error { @@ -26,7 +24,7 @@ func (o *Options) Validate() error { return errors.New("subscriber is required") } - if o.DLQ.Enabled && o.Publisher == nil { + if o.Publisher == nil { return errors.New("publisher is required") } @@ -34,10 +32,8 @@ func (o *Options) Validate() error { return errors.New("logger is required") } - if o.DLQ.Enabled { - if err := o.DLQ.Validate(); err != nil { - return err - } + if err := o.Config.Validate(); err != nil { + return err } return nil @@ -45,61 +41,55 @@ func (o *Options) Validate() error { // NewDefaultRouter creates a new router with the default middlewares, in case your consumer // would mandate a different setup, feel free to create your own router -// -// dlqHandler is the handler that will be called when a message is consumed from the DLQ, -// this is specified separately as the options struct is initialized externally from the consumer -// and the handler is initialized internally -func NewDefaultRouter(opts Options, dlqHandler message.NoPublishHandlerFunc) (*message.Router, error) { +func NewDefaultRouter(opts Options) (*message.Router, error) { if err := opts.Validate(); err != nil { return nil, err } - if opts.DLQ.Enabled { - if dlqHandler == nil { - return nil, errors.New("dlq handler is required") - } + router, err := message.NewRouter(message.RouterConfig{}, watermill.NewSlogLogger(opts.Logger)) + if err != nil { + return nil, err } - router, err := message.NewRouter(message.RouterConfig{}, watermill.NewSlogLogger(opts.Logger)) + // This should be the outermost middleware, to catch failures including the ones caused by Recoverer + + // If retry queue is not enabled, we can directly push messages to the DLQ + poisionQueue, err := middleware.PoisonQueue(opts.Publisher, opts.Config.DLQ.Topic) if err != nil { return nil, err } router.AddMiddleware( - middleware.CorrelationID, - - middleware.Retry{ - MaxRetries: 5, - InitialInterval: 100 * time.Millisecond, - Logger: watermill.NewSlogLogger(opts.Logger), - }.Middleware, + poisionQueue, + ) + router.AddMiddleware( + middleware.CorrelationID, middleware.Recoverer, ) - if opts.DLQ.Enabled { - poisionQueue, err := middleware.PoisonQueue(opts.Publisher, opts.DLQ.Topic) - if err != nil { - return nil, err - } + router.AddMiddleware(middleware.Retry{ + MaxRetries: opts.Config.Retry.MaxRetries, + InitialInterval: opts.Config.Retry.InitialInterval, + MaxInterval: opts.Config.Retry.MaxInterval, + MaxElapsedTime: opts.Config.Retry.MaxElapsedTime, - router.AddMiddleware( - poisionQueue, - ) + Multiplier: 1.5, + RandomizationFactor: 0.25, + Logger: watermill.NewSlogLogger(opts.Logger), + }.Middleware) - poisionQueueProcessor := nopublisher.NoPublisherHandlerToHandlerFunc(dlqHandler) - if opts.DLQ.Throttle.Enabled { - poisionQueueProcessor = middleware.NewThrottle( - opts.DLQ.Throttle.Count, - opts.DLQ.Throttle.Duration, - ).Middleware(poisionQueueProcessor) - } - router.AddNoPublisherHandler( - "process_dlq", - opts.DLQ.Topic, - opts.Subscriber, - nopublisher.HandlerFuncToNoPublisherHandler(poisionQueueProcessor), + // This should be after Retry, so that we can retry on timeouts before pushing to DLQ + if opts.Config.ProcessingTimeout > 0 { + router.AddMiddleware( + // The Timeout middleware keeps the messages context overridden after returning, thus the retry will + // also timeout, thus we need to save the context before applying the Timeout middleware + // + // Issue: https://github.com/ThreeDotsLabs/watermill/issues/467 + RestoreContext, + middleware.Timeout(opts.Config.ProcessingTimeout), ) } + return router, nil } diff --git a/internal/watermill/router/router_test.go b/internal/watermill/router/router_test.go new file mode 100644 index 000000000..07d2ad084 --- /dev/null +++ b/internal/watermill/router/router_test.go @@ -0,0 +1,261 @@ +package router + +import ( + "context" + "errors" + "log/slog" + "testing" + "time" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/openmeterio/openmeter/config" +) + +type mockHandler struct { + mock.Mock +} + +const ( + WaitForContextHeader = "x-wait-for-context" +) + +func (m *mockHandler) Handle(msg *message.Message) error { + args := m.Called(msg) + err := args.Error(0) + + slog.Info("handling message (initial handler)", "result", err) + + if msg.Metadata.Get(WaitForContextHeader) != "" { + start := time.Now() + <-msg.Context().Done() + slog.Info("context done", "held_for", time.Since(start)) + } + + return err +} + +func (m *mockHandler) HandleDLQ(msg *message.Message) error { + slog.Info("message arrived at DLQ (final)") + + args := m.Called(msg) + return args.Error(0) +} + +type DoneCondition struct { + done chan struct{} +} + +func NewDoneSignal() *DoneCondition { + return &DoneCondition{ + done: make(chan struct{}, 1), + } +} + +func (d *DoneCondition) Done(mock.Arguments) { + close(d.done) +} + +func (d *DoneCondition) Wait(timeout time.Duration) error { + t := time.NewTimer(timeout) + defer t.Stop() + + select { + case <-d.done: + return nil + case <-t.C: + return errors.New("timeout") + } +} + +func TestDefaultRouter(t *testing.T) { + tcs := []struct { + Name string + Config Options + SetupMocks func(*mockHandler, *DoneCondition) + }{ + { + Name: "DLQ enabled, no retry queue, happy path", + Config: Options{ + Config: config.ConsumerConfiguration{ + Retry: config.RetryConfiguration{ + InitialInterval: 100 * time.Millisecond, + MaxInterval: 100 * time.Millisecond, + }, + }, + }, + SetupMocks: func(mh *mockHandler, done *DoneCondition) { + mh.On("Handle", mock.Anything).Return(nil).Run(done.Done).Once() + }, + }, + { + Name: "DLQ enabled, no retry queue, failed message", + // After the first failure the message is sent to the DLQ + Config: Options{ + Config: config.ConsumerConfiguration{ + Retry: config.RetryConfiguration{ + InitialInterval: 100 * time.Millisecond, + MaxInterval: 100 * time.Millisecond, + }, + }, + }, + SetupMocks: func(mh *mockHandler, done *DoneCondition) { + mh.On("Handle", mock.Anything).Return(assert.AnError).Once() + mh.On("HandleDLQ", mock.Anything).Return(nil).Run(done.Done).Once() + }, + }, + { + Name: "DLQ enabled, retry queue enabled, happy path", + // Message gets processed without any additional steps + Config: Options{ + Config: config.ConsumerConfiguration{ + Retry: config.RetryConfiguration{ + InitialInterval: 100 * time.Millisecond, + MaxInterval: 100 * time.Millisecond, + }, + }, + }, + SetupMocks: func(mh *mockHandler, done *DoneCondition) { + mh.On("Handle", mock.Anything).Return(nil).Run(done.Done).Once() + }, + }, + { + Name: "DLQ enabled, retry queue enabled, failed message", + // Message gets processed without any additional steps + Config: Options{ + Config: config.ConsumerConfiguration{ + Retry: config.RetryConfiguration{ + InitialInterval: 10 * time.Millisecond, + MaxInterval: 10 * time.Millisecond, + MaxRetries: 5, + }, + }, + }, + SetupMocks: func(mh *mockHandler, done *DoneCondition) { + // Flow: 1st failure -> retries -> DLQ + mh.On("Handle", mock.Anything).Return(assert.AnError).Times(5) + + mh.On("HandleDLQ", mock.Anything).Return(nil).Run(done.Done).Once() + }, + }, + { + Name: "Timeout handling", + // No retry queue, no DLQ, just timeout => retry every time the timeout passes + Config: Options{ + Config: config.ConsumerConfiguration{ + ProcessingTimeout: 50 * time.Millisecond, + Retry: config.RetryConfiguration{ + InitialInterval: 10 * time.Millisecond, + MaxInterval: 10 * time.Millisecond, + MaxRetries: 5, + }, + }, + }, + SetupMocks: func(mh *mockHandler, done *DoneCondition) { + mh.On("Handle", mock.Anything).Return(assert.AnError).Run(func(args mock.Arguments) { + msg := args.Get(0).(*message.Message) + // Let's instruct the handler to simulate a timeout + msg.Metadata.Set(WaitForContextHeader, "true") + }).Times(2) + + mh.On("Handle", mock.Anything).Return(nil).Run(done.Done).Once() + }, + }, + { + Name: "Timeout handling => DLQ", + // No retry queue, no DLQ, just timeout => retry every time the timeout passes + Config: Options{ + Config: config.ConsumerConfiguration{ + ProcessingTimeout: 100 * time.Millisecond, + Retry: config.RetryConfiguration{ + InitialInterval: 10 * time.Millisecond, + MaxInterval: 10 * time.Millisecond, + MaxRetries: 3, + }, + }, + }, + SetupMocks: func(mh *mockHandler, done *DoneCondition) { + mh.On("Handle", mock.Anything).Return(assert.AnError).Run(func(args mock.Arguments) { + msg := args.Get(0).(*message.Message) + // Let's instruct the handler to simulate a timeout + msg.Metadata.Set(WaitForContextHeader, "true") + }).Times(4) + + mh.On("HandleDLQ", mock.Anything).Return(nil).Run(done.Done).Once() + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.Name, func(t *testing.T) { + inMemoryPubSub := gochannel.NewGoChannel( + gochannel.Config{ + OutputChannelBuffer: 10, + }, + watermill.NewSlogLogger(slog.Default()), + ) + defer func() { + assert.NoError(t, inMemoryPubSub.Close()) + }() + + handler := mockHandler{} + + options := tc.Config + options.Subscriber = inMemoryPubSub + options.Publisher = inMemoryPubSub + options.Logger = slog.Default() + + options.Config.DLQ.Topic = "test-dlq" + options.Config.ConsumerGroupName = "test-group" + + router, err := NewDefaultRouter(options) + + assert.NoError(t, err) + assert.NotNil(t, router) + + const topicName = "testTopic" + + router.AddNoPublisherHandler( + "test", + topicName, + inMemoryPubSub, + handler.Handle, + ) + + router.AddNoPublisherHandler( + "test-dlq", + options.Config.DLQ.Topic, + inMemoryPubSub, + handler.HandleDLQ, + ) + + go func() { + assert.NoError(t, router.Run(context.Background())) + }() + + <-router.Running() + defer func() { + if router.IsRunning() && !router.IsClosed() { + assert.NoError(t, router.Close()) + } + }() + + done := NewDoneSignal() + tc.SetupMocks(&handler, done) + + msg := message.NewMessage(watermill.NewUUID(), []byte("testPayload")) + + assert.NoError(t, inMemoryPubSub.Publish(topicName, msg)) + + if !assert.NoError(t, done.Wait(20000*time.Second)) { + assert.FailNow(t, "timeout during test execution") + } + + assert.NoError(t, router.Close()) + }) + } +} diff --git a/openmeter/watermill/router/router.go b/openmeter/watermill/router/router.go index 0f0ca5eb0..5b72ad257 100644 --- a/openmeter/watermill/router/router.go +++ b/openmeter/watermill/router/router.go @@ -10,6 +10,6 @@ type ( Options = router.Options ) -func NewDefaultRouter(opts Options, dlqHandler message.NoPublishHandlerFunc) (*message.Router, error) { - return router.NewDefaultRouter(opts, dlqHandler) +func NewDefaultRouter(opts Options) (*message.Router, error) { + return router.NewDefaultRouter(opts) } From 8831608f15bdc4ffc5667d69378ac933941647c2 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 14 Aug 2024 20:21:50 +0200 Subject: [PATCH 2/2] feat: add support for setting the retention of dlq topics --- cmd/balance-worker/main.go | 1 + cmd/notification-service/main.go | 1 + internal/watermill/driver/kafka/topic_provision.go | 8 ++++++++ 3 files changed, 10 insertions(+) diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index aae4f466f..74ad16af2 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -341,6 +341,7 @@ func initEventPublisherDriver(ctx context.Context, broker watermillkafka.BrokerO provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{ Topic: conf.BalanceWorker.DLQ.Topic, NumPartitions: int32(conf.BalanceWorker.DLQ.AutoProvision.Partitions), + Retention: conf.BalanceWorker.DLQ.AutoProvision.Retention, }) } diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index 8230af36a..ee78734c1 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -334,6 +334,7 @@ func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf con provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{ Topic: conf.Notification.Consumer.DLQ.Topic, NumPartitions: int32(conf.Notification.Consumer.DLQ.AutoProvision.Partitions), + Retention: conf.BalanceWorker.DLQ.AutoProvision.Retention, }) } diff --git a/internal/watermill/driver/kafka/topic_provision.go b/internal/watermill/driver/kafka/topic_provision.go index 6c9c40340..9e75a0434 100644 --- a/internal/watermill/driver/kafka/topic_provision.go +++ b/internal/watermill/driver/kafka/topic_provision.go @@ -3,6 +3,8 @@ package kafka import ( "context" "log/slog" + "strconv" + "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) @@ -10,6 +12,7 @@ import ( type AutoProvisionTopic struct { Topic string NumPartitions int32 + Retention time.Duration } // provisionTopics creates the topics if they don't exist. This relies on the confluent kafka lib, as the sarama doesn't seem to @@ -26,10 +29,15 @@ func provisionTopics(ctx context.Context, logger *slog.Logger, config kafka.Conf defer adminClient.Close() for _, topic := range topics { + topicConfig := map[string]string{} + if topic.Retention > 0 { + topicConfig["retention.ms"] = strconv.FormatInt(topic.Retention.Milliseconds(), 10) + } result, err := adminClient.CreateTopics(ctx, []kafka.TopicSpecification{ { Topic: topic.Topic, NumPartitions: int(topic.NumPartitions), + Config: topicConfig, }, }) if err != nil {