Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: postgres saga support #120

Merged
merged 9 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/workflows/dotnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,8 @@ jobs:
run: dotnet pack --no-build --configuration Release --output ${{ env.NuGetDirectory }}

# Install docker test dependencies
- name: Docker SQL
run: docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=Password12!' -e 'MSSQL_PID=Express' -p 1433:1433 -d mcr.microsoft.com/mssql/server:2019-latest
- name: Docker Azurite
run: docker run -d -p 10000:10000 -p 10001:10001 mcr.microsoft.com/azure-storage/azurite
- name: Docker Redis
run: docker run -d -p 6379:6379 --name redis6379 redis
- name: Docker nats
run: docker run -d -p 4222:4222 nats:latest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<PackageIconUrl>https://raw.githubusercontent.com/BookBeat/knightbus/master/documentation/media/images/knighbus-64.png</PackageIconUrl>
<PackageIcon>knighbus-64.png</PackageIcon>
<RepositoryUrl>https://github.com/BookBeat/knightbus</RepositoryUrl>
<Version>1.0.2</Version>
<Version>1.1.0</Version>
<PackageTags>knightbus;postgresql;queues;messaging</PackageTags>
<LangVersion>latest</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using KnightBus.Core.Sagas;
using KnightBus.PostgreSql.Sagas;
using Microsoft.Extensions.DependencyInjection;
using Npgsql;

Expand All @@ -18,4 +20,10 @@ public static IServiceCollection UsePostgres(
services.AddScoped<IPostgresBus, PostgresBus>();
return services;
}

public static IServiceCollection UsePostgresSagaStore(this IServiceCollection services)
{
services.EnableSagas<PostgresSagaStore>();
return services;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ await QueueInitializer.InitSubscription(

protected override bool ShouldCreateChannel(Exception e)
{
return e is PostgresException { SqlState: "42P01" };
return e is PostgresException { SqlState: PostgresErrorCodes.UndefinedTable };
}

protected override async Task CleanupResources()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
using KnightBus.Core.Sagas;
using KnightBus.Core.Sagas.Exceptions;
using KnightBus.Messages;
using Npgsql;
using static KnightBus.PostgreSql.PostgresConstants;

namespace KnightBus.PostgreSql.Sagas;

public class PostgresSagaStore : ISagaStore
{
private readonly NpgsqlDataSource _npgsqlDataSource;
private readonly IMessageSerializer _serializer;

private const string Table = "sagas";

public PostgresSagaStore(NpgsqlDataSource npgsqlDataSource, IPostgresConfiguration configuration)
{
_npgsqlDataSource = npgsqlDataSource;
_serializer = configuration.MessageSerializer;
}

private async Task CreateSagaTable()
{
const string query = $"""
CREATE SCHEMA IF NOT EXISTS {SchemaName};
CREATE TABLE IF NOT EXISTS {SchemaName}.{Table} (
partition_key VARCHAR(50) NOT NULL,
id VARCHAR(50) NOT NULL,
data BYTEA NOT NULL,
expiration TIMESTAMP WITH TIME ZONE NOT NULL,
PRIMARY KEY (partition_key, id)
);
""";
await using var command = _npgsqlDataSource.CreateCommand(query);
await command.ExecuteNonQueryAsync(CancellationToken.None);
}

public async Task<SagaData<T>> GetSaga<T>(string partitionKey, string id, CancellationToken ct)
{
const string query =
$"SELECT data FROM {SchemaName}.{Table} WHERE partition_key = $1 AND id = $2 AND expiration > $3";
await using var connection = await _npgsqlDataSource.OpenConnectionAsync(ct);
await using var command = new NpgsqlCommand(query, connection);
command.Parameters.Add(new NpgsqlParameter<string> { TypedValue = partitionKey });
command.Parameters.Add(new NpgsqlParameter<string> { TypedValue = id });
command.Parameters.Add(new NpgsqlParameter<DateTime> { TypedValue = DateTime.UtcNow });

try
{
await command.PrepareAsync(ct);
await using var reader = await command.ExecuteReaderAsync(ct).ConfigureAwait(false);
if (!await reader.ReadAsync(ct).ConfigureAwait(false)) throw new SagaNotFoundException(partitionKey, id);

var bytes = await reader.GetFieldValueAsync<ReadOnlyMemory<byte>>(0, ct).ConfigureAwait(false);
var data = _serializer.Deserialize<T>(bytes);
return new SagaData<T> { Data = data };
}
catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.UndefinedTable)
{
await CreateSagaTable();
return await GetSaga<T>(partitionKey, id, ct);
}
}

public async Task<SagaData<T>> Create<T>(string partitionKey, string id, T sagaData, TimeSpan ttl,
CancellationToken ct)
{
const string query = $"""
INSERT INTO {SchemaName}.{Table} (partition_key, id, data, expiration) VALUES ($1, $2, $3, $4)
ON CONFLICT (partition_key, id) DO UPDATE SET
data = EXCLUDED.data,
expiration = EXCLUDED.expiration
WHERE {SchemaName}.{Table}.expiration < $5;
""";
await using var connection = await _npgsqlDataSource.OpenConnectionAsync(ct);
await using var command = new NpgsqlCommand(query, connection);

command.Parameters.Add(new NpgsqlParameter<string> { TypedValue = partitionKey });
command.Parameters.Add(new NpgsqlParameter<string> { TypedValue = id });
command.Parameters.Add(new NpgsqlParameter<byte[]> { TypedValue = _serializer.Serialize<T>(sagaData) });
command.Parameters.Add(new NpgsqlParameter<DateTime> { TypedValue = DateTime.UtcNow.Add(ttl) });
command.Parameters.Add(new NpgsqlParameter<DateTime> { TypedValue = DateTime.UtcNow });

try
{
await command.PrepareAsync(ct);
var affectedRows = await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);

if (affectedRows != 1) throw new SagaAlreadyStartedException(partitionKey, id);
return new SagaData<T> { Data = sagaData };
}
catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.UndefinedTable)
{
await CreateSagaTable();
return await Create(partitionKey, id, sagaData, ttl, ct);
}
catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.UniqueViolation)
{
throw new SagaAlreadyStartedException(partitionKey, id);
}
}

public async Task Update<T>(string partitionKey, string id, SagaData<T> sagaData, CancellationToken ct)
{
const string query = $"UPDATE {SchemaName}.{Table} SET data = $1 WHERE partition_key = $2 AND id = $3";

await using var connection = await _npgsqlDataSource.OpenConnectionAsync(ct);
await using var command = new NpgsqlCommand(query, connection);
var data = _serializer.Serialize(sagaData.Data);
command.Parameters.Add(new NpgsqlParameter<byte[]> { TypedValue = data });
command.Parameters.Add(new NpgsqlParameter<string> { TypedValue = partitionKey });
command.Parameters.Add(new NpgsqlParameter<string> { TypedValue = id });

try
{
await command.PrepareAsync(ct);
var affectedRows = await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
if (affectedRows != 1) throw new SagaNotFoundException(partitionKey, id);
}
catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.UndefinedTable)
{
await CreateSagaTable();
await Update(partitionKey, id, sagaData, ct);
}
}

public async Task Complete<T>(string partitionKey, string id, SagaData<T> sagaData, CancellationToken ct)
{
const string query = $"DELETE FROM {SchemaName}.{Table} WHERE partition_key = $1 AND id = $2";
await using var connection = await _npgsqlDataSource.OpenConnectionAsync(ct);
await using var command = new NpgsqlCommand(query, connection);
command.Parameters.Add(new NpgsqlParameter<string> { TypedValue = partitionKey });
command.Parameters.Add(new NpgsqlParameter<string> { TypedValue = id });

try
{
await command.PrepareAsync(ct);
var affectedRows = await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
if (affectedRows != 1) throw new SagaNotFoundException(partitionKey, id);
}
catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.UndefinedTable)
{
await CreateSagaTable();
await Complete(partitionKey, id, sagaData, ct);
}
}

public async Task Delete(string partitionKey, string id, CancellationToken ct)
{
const string query = $"DELETE FROM {SchemaName}.{Table} WHERE partition_key = $1 AND id = $2";
await using var connection = await _npgsqlDataSource.OpenConnectionAsync(ct);
await using var command = new NpgsqlCommand(query, connection);
command.Parameters.Add(new NpgsqlParameter<string> { TypedValue = partitionKey });
command.Parameters.Add(new NpgsqlParameter<string> { TypedValue = id });

try
{
await command.PrepareAsync(ct);
var affectedRows = await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
if (affectedRows != 1) throw new SagaNotFoundException(partitionKey, id);
}
catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.UndefinedTable)
{
await CreateSagaTable();
await Delete(partitionKey, id, ct);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using KnightBus.PostgreSql.Sagas;
using KnightBus.Shared.Tests.Integration;
using NUnit.Framework;
using static KnightBus.PostgreSql.Tests.Integration.PostgresSetup;

namespace KnightBus.PostgreSql.Tests.Integration;

[TestFixture]
public class PostgresSagaStoreTests : SagaStoreTests
{
public override void Setup()
{
SagaStore = new PostgresSagaStore(DataSource, new PostgresConfiguration(DataSource.ConnectionString));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
</PackageReference>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
<PackageReference Include="Testcontainers.Redis" Version="3.8.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System.Threading.Tasks;
using NUnit.Framework;
using Testcontainers.Redis;

namespace KnightBus.Redis.Tests.Integration;

[SetUpFixture]
public class RedisSetup
{
private static readonly RedisContainer Redis = new RedisBuilder()
.WithImage("redis")
.WithPortBinding(6380, 6379)
.Build();

public static string ConnectionString;

[OneTimeSetUp]
public async Task OneTimeSetup()
{
await Redis.StartAsync();
ConnectionString = Redis.GetConnectionString();
}

[OneTimeTearDown]
public async Task Teardown()
{
await Redis.DisposeAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public sealed class RedisTestBase
[OneTimeSetUp]
public void BaseSetup()
{
Configuration = new RedisConfiguration("localhost:6379");
Configuration = new RedisConfiguration(RedisSetup.ConnectionString);
Multiplexer = ConnectionMultiplexer.Connect($"{Configuration.ConnectionString},allowAdmin=true");
Database = Multiplexer.GetDatabase(Configuration.DatabaseId);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<PackageReference Include="Moq" Version="4.20.70" />
<PackageReference Include="NUnit" Version="4.1.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageReference Include="Testcontainers.MsSql" Version="3.8.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\knightbus\tests\KnightBus.Shared.Tests.Integration\KnightBus.Shared.Tests.Integration.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ public class SqlSagaStoreTests : SagaStoreTests
{
public override void Setup()
{
SagaStore = new SqlServerSagaStore(DatabaseInitializer.ConnectionString);
SagaStore = new SqlServerSagaStore(SqlServerSetup.ConnectionString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
using NUnit.Framework;
using Testcontainers.MsSql;

namespace KnightBus.SqlServer.Tests.Integration;

[SetUpFixture]
public class SqlServerSetup
{
private const string DatabaseName = "KnightBus";

private static readonly MsSqlContainer MsSql = new MsSqlBuilder()
.WithImage("mcr.microsoft.com/mssql/server:2019-latest")
.WithPortBinding(14333, 1433)
.Build();

public static string ConnectionString;

[OneTimeSetUp]
public async Task OneTimeSetup()
{
await MsSql.StartAsync();
ConnectionString = MsSql.GetConnectionString();
}

[OneTimeTearDown]
public async Task Teardown()
{
await MsSql.DisposeAsync();
}
}
Loading
Loading