Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poc/nats jetstream #100

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9d176a3
It works!
mbaneryd May 11, 2023
f8f5df2
Start separating into Nats/Jetstream
NiklasArbin Aug 17, 2023
bfbbf08
more work
NiklasArbin Aug 17, 2023
f54fb07
Start extracting NatsQueueChannelReceiverBase
NiklasArbin Aug 17, 2023
2eba547
downgrade to 6
NiklasArbin Aug 17, 2023
88cd3f2
start jetstream for the docker
NiklasArbin Aug 17, 2023
f5ce2f4
create messages for js
NiklasArbin Aug 18, 2023
68d4316
wip channel receiver
NiklasArbin Aug 18, 2023
06013d2
Use base class
NiklasArbin Aug 18, 2023
3306a0d
move Msg.Ack() into statehandler
NiklasArbin Aug 18, 2023
996555d
create consumer
NiklasArbin Aug 18, 2023
1cc0129
use jetstream message interfaces
NiklasArbin Aug 18, 2023
00b409a
Add plumbing for integration tests
NiklasArbin Aug 21, 2023
307d732
Fix
NiklasArbin Aug 21, 2023
d3c2e75
Send message
NiklasArbin Aug 21, 2023
0918747
Use service replacements
NiklasArbin Aug 22, 2023
7f7671f
Trying events
NiklasArbin Aug 22, 2023
7b7d17e
Remove usings
NiklasArbin Aug 22, 2023
a7cb8a3
playing around with sources
NiklasArbin Aug 23, 2023
0b8945c
hmmz
NiklasArbin Aug 25, 2023
3907904
semi working events
NiklasArbin Aug 25, 2023
150ec06
initial working dl queue
NiklasArbin Aug 28, 2023
36ad695
Use full subject in QueueName
NiklasArbin Aug 28, 2023
a5c0982
working initial tests
NiklasArbin Aug 28, 2023
e6a6e73
Move setup to JetStreamIntegrationTest
NiklasArbin Aug 28, 2023
66c45fb
Use push
NiklasArbin Aug 28, 2023
17f97d2
bump gpt review
NiklasArbin Sep 12, 2023
2ae13a8
Merge branch 'master' into poc/nats-jetstream
NiklasArbin Sep 12, 2023
42814a1
Merge branch 'master' into poc/nats-jetstream
Mar 6, 2024
91cfd71
use message preprocessors
Mar 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dotnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Docker Redis
run: docker run -d -p 6379:6379 --name redis6379 redis
- name: Docker nats
run: docker run -d -p 4222:4222 nats:latest
run: docker run -d -p 4222:4222 nats:latest -js
- name: Docker Azurite
run: docker run -d -p 10000:10000 -p 10001:10001 mcr.microsoft.com/azure-storage/azurite
- name: Docker SQL
Expand Down
14 changes: 14 additions & 0 deletions KnightBus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KnightBus.Nats.Messages", "
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KnightBus.Nats.Tests.Unit", "knightbus-nats\tests\KnightBus.Nats.Tests.Unit\KnightBus.Nats.Tests.Unit.csproj", "{4E160CA8-3B3F-404B-ACE5-FA9B696BC921}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KnightBus.Examples.Nats.Client", "knightbus\examples\KnightBus.Examples.Nats.Client\KnightBus.Examples.Nats.Client.csproj", "{7A330440-21D1-4437-A216-69617195086D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KnightBus.Nats.Tests.Integration", "knightbus-nats\tests\KnightBus.Nats.Tests.Integration\KnightBus.Nats.Tests.Integration.csproj", "{B3B1E85B-BADD-4778-98B8-E514C984054F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -227,6 +231,14 @@ Global
{4E160CA8-3B3F-404B-ACE5-FA9B696BC921}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4E160CA8-3B3F-404B-ACE5-FA9B696BC921}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4E160CA8-3B3F-404B-ACE5-FA9B696BC921}.Release|Any CPU.Build.0 = Release|Any CPU
{7A330440-21D1-4437-A216-69617195086D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7A330440-21D1-4437-A216-69617195086D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7A330440-21D1-4437-A216-69617195086D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7A330440-21D1-4437-A216-69617195086D}.Release|Any CPU.Build.0 = Release|Any CPU
{B3B1E85B-BADD-4778-98B8-E514C984054F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B3B1E85B-BADD-4778-98B8-E514C984054F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B3B1E85B-BADD-4778-98B8-E514C984054F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B3B1E85B-BADD-4778-98B8-E514C984054F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -254,6 +266,8 @@ Global
{3C9566A8-EB30-491A-9215-B62A30F05764} = {1A7A20B2-65E7-4712-B93B-4AA3F73017A8}
{BC97493B-C23A-40A9-AB96-51D93ED689C3} = {8598A638-84C1-4FE0-9E95-ABE6079A00E7}
{4E160CA8-3B3F-404B-ACE5-FA9B696BC921} = {8598A638-84C1-4FE0-9E95-ABE6079A00E7}
{7A330440-21D1-4437-A216-69617195086D} = {1A7A20B2-65E7-4712-B93B-4AA3F73017A8}
{B3B1E85B-BADD-4778-98B8-E514C984054F} = {8598A638-84C1-4FE0-9E95-ABE6079A00E7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {14950A56-4265-429E-AF71-2D98B3B8AC48}
Expand Down
2 changes: 2 additions & 0 deletions KnightBus.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/UserDictionary/Words/=Deadletter/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
10 changes: 10 additions & 0 deletions knightbus-nats/src/KnightBus.Nats.Messages/IJetStreamCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using KnightBus.Messages;

namespace KnightBus.Nats.Messages
{

public interface IJetStreamCommand : ICommand
{

}
}
10 changes: 10 additions & 0 deletions knightbus-nats/src/KnightBus.Nats.Messages/IJetStreamEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using KnightBus.Messages;

namespace KnightBus.Nats.Messages
{

public interface IJetStreamEvent : IEvent
{

}
}
10 changes: 10 additions & 0 deletions knightbus-nats/src/KnightBus.Nats.Messages/IJetStreamRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using KnightBus.Messages;

namespace KnightBus.Nats.Messages
{

public interface IJetStreamRequest : IRequest
{

}
}
126 changes: 126 additions & 0 deletions knightbus-nats/src/KnightBus.Nats/JetStreamBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using KnightBus.Core;
using KnightBus.Core.Exceptions;
using KnightBus.Core.PreProcessors;
using KnightBus.Messages;
using KnightBus.Nats.Messages;
using NATS.Client;
using NATS.Client.JetStream;

namespace KnightBus.Nats
{
public interface IJetStreamBus
{
Task Send(IJetStreamCommand message, CancellationToken cancellationToken = default);
Task Publish(IJetStreamEvent message, CancellationToken cancellationToken = default);
Task<TResponse> RequestAsync<T, TResponse>(IJetStreamRequest request, CancellationToken cancellationToken = default) where T : IJetStreamRequest;
IEnumerable<TResponse> RequestStream<T, TResponse>(IJetStreamRequest command, CancellationToken cancellationToken = default) where T : IJetStreamRequest;
}

public class JetStreamBus : IJetStreamBus
{
private readonly IConnection _connection;
private readonly IJetStreamConfiguration _configuration;
private readonly IEnumerable<IMessagePreProcessor> _messagePreProcessors;
private readonly IJetStream _streamContext;


public JetStreamBus(IConnection connection, IJetStreamConfiguration configuration, IEnumerable<IMessagePreProcessor> messagePreProcessors)
{
_connection = connection;
_streamContext = _connection.CreateJetStreamContext(configuration.JetStreamOptions);
_configuration = configuration;
_messagePreProcessors = messagePreProcessors;
}

public Task Send(IJetStreamCommand message, CancellationToken cancellationToken = default)
{
return SendInternal(message, cancellationToken);
}

public Task Publish(IJetStreamEvent message, CancellationToken cancellationToken = default)
{
return SendInternal(message, cancellationToken);
}

private async Task SendInternal(IMessage message, CancellationToken cancellationToken = default)
{
var mapping = AutoMessageMapper.GetMapping(message.GetType());
var serializer = _configuration.MessageSerializer;
if (mapping is ICustomMessageSerializer customSerializer) serializer = customSerializer.MessageSerializer;

var msg = new Msg(mapping.QueueName, serializer.Serialize(message));


foreach (var preProcessor in _messagePreProcessors)

Check failure on line 58 in knightbus-nats/src/KnightBus.Nats/JetStreamBus.cs

View workflow job for this annotation

GitHub Actions / DotNET Tests

KnightBus.Nats.Tests.Integration.JetStream.CommandTests ► Should_process_command_once

Failed test found in: knightbus-nats/tests/KnightBus.Nats.Tests.Integration/TestResults/test-results.trx Error: System.NullReferenceException : Object reference not set to an instance of an object.
Raw output
System.NullReferenceException : Object reference not set to an instance of an object.
   at KnightBus.Nats.JetStreamBus.SendInternal(IMessage message, CancellationToken cancellationToken) in /home/runner/work/knightbus/knightbus/knightbus-nats/src/KnightBus.Nats/JetStreamBus.cs:line 58
   at KnightBus.Nats.Tests.Integration.JetStream.CommandTests.Should_process_command_once() in /home/runner/work/knightbus/knightbus/knightbus-nats/tests/KnightBus.Nats.Tests.Integration/JetStream/CommandTests.cs:line 66
   at NUnit.Framework.Internal.TaskAwaitAdapter.GenericAdapter`1.GetResult()
   at NUnit.Framework.Internal.AsyncToSyncAdapter.Await(Func`1 invoke)
   at NUnit.Framework.Internal.Commands.TestMethodCommand.RunTestMethod(TestExecutionContext context)
   at NUnit.Framework.Internal.Commands.TestMethodCommand.Execute(TestExecutionContext context)
   at NUnit.Framework.Internal.Commands.BeforeAndAfterTestCommand.<>c__DisplayClass1_0.<Execute>b__0()
   at NUnit.Framework.Internal.Commands.DelegatingTestCommand.RunTestMethodInThreadAbortSafeZone(TestExecutionContext context, Action action)

Check failure on line 58 in knightbus-nats/src/KnightBus.Nats/JetStreamBus.cs

View workflow job for this annotation

GitHub Actions / DotNET Tests

KnightBus.Nats.Tests.Integration.JetStream.EventTests ► Should_process_event_twice

Failed test found in: knightbus-nats/tests/KnightBus.Nats.Tests.Integration/TestResults/test-results.trx Error: System.NullReferenceException : Object reference not set to an instance of an object.
Raw output
System.NullReferenceException : Object reference not set to an instance of an object.
   at KnightBus.Nats.JetStreamBus.SendInternal(IMessage message, CancellationToken cancellationToken) in /home/runner/work/knightbus/knightbus/knightbus-nats/src/KnightBus.Nats/JetStreamBus.cs:line 58
   at KnightBus.Nats.Tests.Integration.JetStream.EventTests.Should_process_event_twice() in /home/runner/work/knightbus/knightbus/knightbus-nats/tests/KnightBus.Nats.Tests.Integration/JetStream/EventTests.cs:line 21
   at NUnit.Framework.Internal.TaskAwaitAdapter.GenericAdapter`1.GetResult()
   at NUnit.Framework.Internal.AsyncToSyncAdapter.Await(Func`1 invoke)
   at NUnit.Framework.Internal.Commands.TestMethodCommand.RunTestMethod(TestExecutionContext context)
   at NUnit.Framework.Internal.Commands.TestMethodCommand.Execute(TestExecutionContext context)
   at NUnit.Framework.Internal.Commands.BeforeAndAfterTestCommand.<>c__DisplayClass1_0.<Execute>b__0()
   at NUnit.Framework.Internal.Commands.DelegatingTestCommand.RunTestMethodInThreadAbortSafeZone(TestExecutionContext context, Action action)
{
var properties = await preProcessor.PreProcess(message, cancellationToken);
foreach (var property in properties)
{
msg.Header.Add(property.Key, property.Value.ToString());
}
}

await _streamContext.PublishAsync(msg).ConfigureAwait(false);
}

public async Task<TResponse> RequestAsync<T, TResponse>(IJetStreamRequest request,
CancellationToken cancellationToken = default) where T : IJetStreamRequest
{
var mapping = AutoMessageMapper.GetMapping(request.GetType());
var serializer = _configuration.MessageSerializer;
if (mapping is ICustomMessageSerializer customSerializer) serializer = customSerializer.MessageSerializer;

var reply = await _connection
.RequestAsync(mapping.QueueName, serializer.Serialize(request), cancellationToken)
.ConfigureAwait(false);
ThrowIfErrorResponse(reply);
return serializer.Deserialize<TResponse>(reply.Data.AsSpan());
}

public IEnumerable<TResponse> RequestStream<T, TResponse>(IJetStreamRequest command,
CancellationToken cancellationToken = default) where T : IJetStreamRequest
{
var mapping = AutoMessageMapper.GetMapping(command.GetType());
var serializer = _configuration.MessageSerializer;
if (mapping is ICustomMessageSerializer customSerializer) serializer = customSerializer.MessageSerializer;

var inbox = Guid.NewGuid().ToString("N");
using var sub = _connection.SubscribeSync(inbox);
_connection.Publish(mapping.QueueName, inbox, serializer.Serialize(command));

do
{
var msg = sub.NextMessage();
msg.Ack();
if (msg.Data.Length == 0)
{
if (msg.HasHeaders && msg.Header[MsgConstants.HeaderName] == MsgConstants.Completed)
{
break;
}

ThrowIfErrorResponse(msg);

yield return default;
}

yield return serializer.Deserialize<TResponse>(msg.Data.AsSpan());

} while (true);

sub.Unsubscribe();
}

private void ThrowIfErrorResponse(Msg msg)
{
if (msg.Data.Length == 0 && msg.HasHeaders && msg.Header[MsgConstants.HeaderName] == MsgConstants.Error)
{
throw new NATSException("Receiver failed");
}
}
}
}
32 changes: 32 additions & 0 deletions knightbus-nats/src/KnightBus.Nats/JetStreamChannelFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using KnightBus.Core;
using KnightBus.Messages;
using KnightBus.Nats.Messages;

namespace KnightBus.Nats
{
public class JetStreamChannelFactory : ITransportChannelFactory
{
public JetStreamChannelFactory(IJetStreamConfiguration configuration)
{
Configuration = configuration;
}

public ITransportConfiguration Configuration { get; set; }

public IChannelReceiver Create(Type messageType, IEventSubscription subscription, IProcessingSettings processingSettings,
IMessageSerializer serializer, IHostConfiguration configuration, IMessageProcessor processor)
{

var readerType = typeof(JetStreamChannelReceiver<>).MakeGenericType(messageType);
var reader = (IChannelReceiver)Activator.CreateInstance(readerType, processingSettings, serializer, configuration, processor, Configuration, subscription);

return reader;
}

public bool CanCreate(Type messageType)
{
return typeof(IJetStreamCommand).IsAssignableFrom(messageType) || typeof(IJetStreamEvent).IsAssignableFrom(messageType) || typeof(IJetStreamRequest).IsAssignableFrom(messageType);
}
}
}
99 changes: 99 additions & 0 deletions knightbus-nats/src/KnightBus.Nats/JetStreamChannelReceiver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using System;
using System.Threading;
using KnightBus.Core;
using KnightBus.Messages;
using NATS.Client;
using NATS.Client.JetStream;

namespace KnightBus.Nats
{
public static class JetStreamHelpers
{
public static string GetRootDeadletterSubject(string name) => $"dl-{name}";

public static string GetRootSubject(string subject) => subject.IndexOf('.') < 0 ? subject : subject[..subject.IndexOf('.')];
public static string ConvertToDeadletterSubject(string subject)
{
var root = GetRootSubject(subject);
return subject.Replace(root, GetRootDeadletterSubject(root));
}


}
public class JetStreamChannelReceiver<T> : NatsChannelReceiverBase<T>
where T : class, IMessage
{
private readonly IJetStreamConfiguration _configuration;
private readonly IEventSubscription _subscription;
private const string CommandDeliverGroup = "qg";


public JetStreamChannelReceiver(IProcessingSettings settings, IMessageSerializer serializer, IHostConfiguration hostConfiguration,
IMessageProcessor processor, IJetStreamConfiguration configuration, IEventSubscription subscription)
: base(settings, hostConfiguration, serializer, processor)
{
_configuration = configuration;
_subscription = subscription;
}

public override IJetStreamPushSyncSubscription Subscribe(IConnection connection, CancellationToken cancellationToken)
{
var jetStreamManagement = connection.CreateJetStreamManagementContext(_configuration.JetStreamOptions);
string deliverGroup;
if (_subscription is null)
deliverGroup = CommandDeliverGroup;
else
deliverGroup = _subscription.Name;

var queueName = JetStreamHelpers.GetRootSubject(AutoMessageMapper.GetQueueName<T>());
var streamName = $"{queueName}-stream";
var streamSubject = $"{queueName}.>";

var deadletterName = JetStreamHelpers.GetRootDeadletterSubject(queueName);
var deadLetterStreamName = $"{deadletterName}-stream";
var deadletterSubject = $"{deadletterName}.>";

var streamConfig = StreamConfiguration.Builder()
.WithName(streamName)
.WithSubjects(streamSubject)
.WithStorageType(StorageType.Memory)
.WithRetentionPolicy(RetentionPolicy.Interest)
.Build();

UpsertStream(jetStreamManagement, streamConfig);

var dlStreamConfig = StreamConfiguration.Builder()
.WithName(deadLetterStreamName)
.WithSubjects(deadletterSubject)
.WithStorageType(StorageType.Memory)
.WithRetentionPolicy(RetentionPolicy.WorkQueue)
.Build();

UpsertStream(jetStreamManagement, dlStreamConfig);

var jetStream = connection.CreateJetStreamContext(_configuration.JetStreamOptions);

var durable = $"{streamName}-{deliverGroup}-dr";

var subscribeOptions = ConsumerConfiguration.Builder()
.WithDurable(durable)
.WithAckPolicy(AckPolicy.Explicit)
.WithAckWait((long)Math.Round(Settings.MessageLockTimeout.TotalMilliseconds, MidpointRounding.AwayFromZero))
.BuildPushSubscribeOptions();

return jetStream.PushSubscribeSync(streamSubject, deliverGroup, subscribeOptions);
}

private void UpsertStream(IJetStreamManagement jetStreamManagement, StreamConfiguration streamConfiguration)
{
try
{
jetStreamManagement.UpdateStream(streamConfiguration);
}
catch (NATSJetStreamException e) when (e.ErrorCode == 404)
{
jetStreamManagement.AddStream(streamConfiguration);
}
}
}
}
25 changes: 25 additions & 0 deletions knightbus-nats/src/KnightBus.Nats/JetStreamConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using KnightBus.Messages;
using KnightBus.Newtonsoft;
using NATS.Client;
using NATS.Client.JetStream;

namespace KnightBus.Nats
{
public interface IJetStreamConfiguration : INatsConfiguration
{
public JetStreamOptions JetStreamOptions { get; }
}

public class JetStreamConfiguration : IJetStreamConfiguration
{
public string ConnectionString
{
get => Options?.Url;
set => Options.Url = value;
}

public IMessageSerializer MessageSerializer { get; set; } = new NewtonsoftSerializer();
public Options Options { get; } = ConnectionFactory.GetDefaultOptions();
public JetStreamOptions JetStreamOptions { get; set; }
}
}
3 changes: 2 additions & 1 deletion knightbus-nats/src/KnightBus.Nats/NatsBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,14 @@ public IEnumerable<TResponse> RequestStream<T, TResponse>(INatsRequest command,
var serializer = _configuration.MessageSerializer;
if (mapping is ICustomMessageSerializer customSerializer) serializer = customSerializer.MessageSerializer;

var inbox = Guid.NewGuid().ToString("N");
var inbox = _connection.NewInbox();
using var sub = _connection.SubscribeSync(inbox);
_connection.Publish(mapping.QueueName, inbox, serializer.Serialize(command));

do
{
var msg = sub.NextMessage();
msg.Ack();
if (msg.Data.Length == 0)
{
if (msg.HasHeaders && msg.Header[MsgConstants.HeaderName] == MsgConstants.Completed)
Expand Down
Loading
Loading