From 8f95ed947e08a7b52bcd191a4eec0f62630ac6cb Mon Sep 17 00:00:00 2001 From: Plamen Kutinchev Date: Fri, 16 Feb 2024 09:04:56 +0200 Subject: [PATCH] Add ability to receive headers when consuming messages from RabbitMQ. --- rabbitmq/consumer.go | 22 ++++++++++++++++------ rabbitmq/handler.go | 2 ++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/rabbitmq/consumer.go b/rabbitmq/consumer.go index 8836603..a727665 100644 --- a/rabbitmq/consumer.go +++ b/rabbitmq/consumer.go @@ -90,7 +90,10 @@ func (c *Consumer) Run(ctx context.Context) error { c.logger.Info("Received context cancel. Going to close RMQ connections.") err = channel.Cancel(c.handler.GetConsumerTag(), false) if err != nil { - c.logger.Warn("failed to cancel the RMQ channel while stopping handler", logger.ErrorField(err)) + c.logger.Warn( + "failed to cancel the RMQ channel while stopping handler", + logger.ErrorField(err), + ) } // NOTE: We must process the events before we close the channel @@ -112,7 +115,11 @@ func (c *Consumer) Run(ctx context.Context) error { err = channel.Qos(c.cfg.PrefetchCount, 0, false) if err != nil { - return stacktrace.Propagate(err, "failed to set RMQ channel's QoS prefetch count to: %d", c.cfg.PrefetchCount) + return stacktrace.Propagate( + err, + "failed to set RMQ channel's QoS prefetch count to: %d", + c.cfg.PrefetchCount, + ) } deliveries, err := channel.Consume( @@ -164,10 +171,13 @@ func (c *Consumer) handleDeliveries( func (c *Consumer) handleSingleDelivery(ctx context.Context, d *amqp.Delivery) error { c.metric.ObserveMsgDelivered() - acknowledgement, err := c.handler.ReceiveMessage(ctx, &Message{ - Body: d.Body, - CorrelationID: d.CorrelationId, - }) + acknowledgement, err := c.handler.ReceiveMessage( + ctx, &Message{ + Body: d.Body, + CorrelationID: d.CorrelationId, + Headers: d.Headers, + }, + ) if err != nil { return stacktrace.Propagate(err, "handler returned error") } diff --git a/rabbitmq/handler.go b/rabbitmq/handler.go index 7ec0888..06323af 100644 --- a/rabbitmq/handler.go +++ b/rabbitmq/handler.go @@ -50,4 +50,6 @@ type Message struct { // Correlation identifier CorrelationID string + // Message headers + Headers map[string]interface{} }