Skip to content

Commit

Permalink
Update NatsAdapterFactory with logging and error handling
Browse files Browse the repository at this point in the history
- Added logger parameter to ToBatch method
- Implemented error handling for null headers
- Added logging for each header key-value pair
  • Loading branch information
0xF6 committed Jan 3, 2025
1 parent 41b292d commit 812231b
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/Argon.Api/Features/Orleanse/Streams/NatsAdapterFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ namespace Argon.Features.OrleansStreamingProviders;

public static class NatsMsgExtension
{
public static ArgonEventBatch ToBatch(this NatsJSMsg<string> msg, OrleansJsonSerializer serializationManager)
public static ArgonEventBatch ToBatch(this NatsJSMsg<string> msg, ILogger logger, OrleansJsonSerializer serializationManager)
{
if (msg.Headers is null)
throw new NullReferenceException($"СУКА Я СВОЮ МАТЬ ЕБАЛ ЗА ТАКУЮ ХУЙНЮ");

foreach (var natsHeader in msg.Headers)
logger.LogError($"natsHeader: {natsHeader.Key}:{natsHeader.Value}");

var data = serializationManager.Deserialize(typeof(object), msg.Data);
var stream = msg.Headers?["streamId"][0];
var streamId = StreamId.Parse(Encoding.UTF8.GetBytes(stream));
Expand Down Expand Up @@ -101,7 +107,7 @@ await consumer.FetchAsync<string>(new NatsJSFetchOpts
{
MaxMsgs = 1, // TODO: for later optimizations change this number
Expires = TimeSpan.FromSeconds(1)
}).Select(natsMsg => natsMsg.ToBatch(serializationManager)).Select(IBatchContainer (dummy) => dummy).ToListAsync();
}).Select(natsMsg => natsMsg.ToBatch(logger, serializationManager)).Select(IBatchContainer (dummy) => dummy).ToListAsync();

public async Task MessagesDeliveredAsync(IList<IBatchContainer> messages)
{
Expand Down

0 comments on commit 812231b

Please sign in to comment.