From b0db28caf6fb8c2622b8d38bc13c1e52c50e9e07 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 15 Jan 2024 10:02:23 +0800 Subject: [PATCH] [fix] Fix DLQ producer name conflicts when multiples consumers send messages to DLQ (#1156) ### Motivation To keep consistent with the Java client. Releted PR: https://github.com/apache/pulsar/pull/21890 ### Modifications Set DLQ producerName `fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName)` (cherry picked from commit 4e138228fe501ec39856afbc5e541e87a143b73f) --- pulsar/consumer_impl.go | 2 +- pulsar/consumer_regex_test.go | 6 ++++-- pulsar/consumer_test.go | 6 ++++-- pulsar/dlq_router.go | 6 ++++-- pulsar/reader_impl.go | 2 +- 5 files changed, 14 insertions(+), 8 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index d701ab16d6..75d839b412 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -167,7 +167,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } } - dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, client.log) + dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, client.log) if err != nil { return nil, err } diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 3e5f1d61db..ebd7e4e196 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -152,9 +152,10 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string opts := ConsumerOptions{ SubscriptionName: "regex-sub", AutoDiscoveryPeriod: 5 * time.Minute, + Name: "regex-consumer", } - dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { @@ -190,9 +191,10 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string opts := ConsumerOptions{ SubscriptionName: "regex-sub", AutoDiscoveryPeriod: 5 * time.Minute, + Name: "regex-consumer", } - dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 8b983d0d19..df70b0dd0b 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1449,13 +1449,15 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { if prodOpt != nil { dlqPolicy.ProducerOptions = *prodOpt } - sub := "my-sub" + sub, consumerName := "my-sub", "my-consumer" + consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: sub, NackRedeliveryDelay: 1 * time.Second, Type: Shared, DLQ: &dlqPolicy, + Name: consumerName, }) assert.Nil(t, err) defer consumer.Close() @@ -1508,7 +1510,7 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { assert.Equal(t, []byte(expectMsg), msg.Payload()) // check dql produceName - assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-DLQ", topic, sub)) + assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-%s-DLQ", topic, sub, consumerName)) // check original messageId assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID]) diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 5b9314bddc..6be35d7485 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -34,16 +34,18 @@ type dlqRouter struct { closeCh chan interface{} topicName string subscriptionName string + consumerName string log log.Logger } -func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string, +func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName, consumerName string, logger log.Logger) (*dlqRouter, error) { r := &dlqRouter{ client: client, policy: policy, topicName: topicName, subscriptionName: subscriptionName, + consumerName: consumerName, log: logger, } @@ -159,7 +161,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { opt.Topic = r.policy.DeadLetterTopic opt.Schema = schema if opt.Name == "" { - opt.Name = fmt.Sprintf("%s-%s-DLQ", r.topicName, r.subscriptionName) + opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName) } // the origin code sets to LZ4 compression with no options diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 0999e88fee..7b260b88db 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -127,7 +127,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { } // Provide dummy dlq router with not dlq policy - dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, client.log) + dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, options.Name, client.log) if err != nil { return nil, err }