Skip to content

Commit

Permalink
feat: add support for setting the retention of dlq topics
Browse files Browse the repository at this point in the history
  • Loading branch information
turip committed Aug 15, 2024
1 parent 0b07819 commit 8831608
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func initEventPublisherDriver(ctx context.Context, broker watermillkafka.BrokerO
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Topic: conf.BalanceWorker.DLQ.Topic,
NumPartitions: int32(conf.BalanceWorker.DLQ.AutoProvision.Partitions),
Retention: conf.BalanceWorker.DLQ.AutoProvision.Retention,
})
}

Expand Down
1 change: 1 addition & 0 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf con
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Topic: conf.Notification.Consumer.DLQ.Topic,
NumPartitions: int32(conf.Notification.Consumer.DLQ.AutoProvision.Partitions),
Retention: conf.BalanceWorker.DLQ.AutoProvision.Retention,
})
}

Expand Down
8 changes: 8 additions & 0 deletions internal/watermill/driver/kafka/topic_provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package kafka
import (
"context"
"log/slog"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

type AutoProvisionTopic struct {
Topic string
NumPartitions int32
Retention time.Duration
}

// provisionTopics creates the topics if they don't exist. This relies on the confluent kafka lib, as the sarama doesn't seem to
Expand All @@ -26,10 +29,15 @@ func provisionTopics(ctx context.Context, logger *slog.Logger, config kafka.Conf
defer adminClient.Close()

for _, topic := range topics {
topicConfig := map[string]string{}
if topic.Retention > 0 {
topicConfig["retention.ms"] = strconv.FormatInt(topic.Retention.Milliseconds(), 10)
}
result, err := adminClient.CreateTopics(ctx, []kafka.TopicSpecification{
{
Topic: topic.Topic,
NumPartitions: int(topic.NumPartitions),
Config: topicConfig,
},
})
if err != nil {
Expand Down

0 comments on commit 8831608

Please sign in to comment.