Skip to content

Commit

Permalink
do not throw exception for existing cap-msg-group message header. ove…
Browse files Browse the repository at this point in the history
…rwrite it instead. tolerate kafka message duplicate headers (#1623)
  • Loading branch information
PoteRii authored Dec 5, 2024
1 parent 40a56d2 commit 09f2a05
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ Task Consume()

var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null);

message.Headers.Add(Headers.Group, _groupId);
message.Headers[Headers.Group] = _groupId;

return OnMessageCallback!(message, response.Messages[0].ReceiptHandle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private TransportMessage ConvertMessage(ServiceBusReceivedMessage message)
var headers = message.ApplicationProperties
.ToDictionary(x => x.Key, y => y.Value?.ToString());

headers.Add(Headers.Group, _subscriptionName);
headers[Headers.Group] = _subscriptionName;

if (_asbOptions.CustomHeadersBuilder != null)
{
Expand Down
4 changes: 2 additions & 2 deletions src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ private async Task Consume(ConsumeResult<string, byte[]> consumerResult)
foreach (var header in consumerResult.Message.Headers)
{
var val = header.GetValueBytes();
headers.Add(header.Key, val != null ? Encoding.UTF8.GetString(val) : null);
headers[header.Key] = val != null ? Encoding.UTF8.GetString(val) : null;
}

headers.Add(Headers.Group, _groupId);
headers[Headers.Group] = _groupId;

if (_kafkaOptions.CustomHeadersBuilder != null)
{
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ Task Consume()
headers.Add(h, e.Message.Header[h]);
}

headers.Add(Headers.Group, _groupName);
headers[Headers.Group] = _groupName;

if (_natsOptions.CustomHeadersBuilder != null)
{
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Task Consume()
headers.Add(header.Key, header.Value);
}

headers.Add(Headers.Group, _groupId);
headers[Headers.Group] = _groupId;

var message = new TransportMessage(headers, consumerResult.Data);

Expand Down
3 changes: 2 additions & 1 deletion src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
Expand Down Expand Up @@ -67,7 +68,7 @@ Task Consume()
headers.Add(header.Key, header.Value?.ToString());
}

headers.Add(Messages.Headers.Group, _groupName);
headers[Messages.Headers.Group] = _groupName;

if (_customHeadersBuilder != null)
{
Expand Down

0 comments on commit 09f2a05

Please sign in to comment.