diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index aae4f466f..74ad16af2 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -341,6 +341,7 @@ func initEventPublisherDriver(ctx context.Context, broker watermillkafka.BrokerO provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{ Topic: conf.BalanceWorker.DLQ.Topic, NumPartitions: int32(conf.BalanceWorker.DLQ.AutoProvision.Partitions), + Retention: conf.BalanceWorker.DLQ.AutoProvision.Retention, }) } diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index 8230af36a..ee78734c1 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -334,6 +334,7 @@ func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf con provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{ Topic: conf.Notification.Consumer.DLQ.Topic, NumPartitions: int32(conf.Notification.Consumer.DLQ.AutoProvision.Partitions), + Retention: conf.BalanceWorker.DLQ.AutoProvision.Retention, }) } diff --git a/internal/watermill/driver/kafka/topic_provision.go b/internal/watermill/driver/kafka/topic_provision.go index 6c9c40340..9e75a0434 100644 --- a/internal/watermill/driver/kafka/topic_provision.go +++ b/internal/watermill/driver/kafka/topic_provision.go @@ -3,6 +3,8 @@ package kafka import ( "context" "log/slog" + "strconv" + "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) @@ -10,6 +12,7 @@ import ( type AutoProvisionTopic struct { Topic string NumPartitions int32 + Retention time.Duration } // provisionTopics creates the topics if they don't exist. This relies on the confluent kafka lib, as the sarama doesn't seem to @@ -26,10 +29,15 @@ func provisionTopics(ctx context.Context, logger *slog.Logger, config kafka.Conf defer adminClient.Close() for _, topic := range topics { + topicConfig := map[string]string{} + if topic.Retention > 0 { + topicConfig["retention.ms"] = strconv.FormatInt(topic.Retention.Milliseconds(), 10) + } result, err := adminClient.CreateTopics(ctx, []kafka.TopicSpecification{ { Topic: topic.Topic, NumPartitions: int(topic.NumPartitions), + Config: topicConfig, }, }) if err != nil {