Skip to content

Commit

Permalink
chore: add example saga
Browse files Browse the repository at this point in the history
  • Loading branch information
elmaxe committed May 25, 2024
1 parent 4f7b1ef commit 6026d6b
Showing 1 changed file with 72 additions and 4 deletions.
76 changes: 72 additions & 4 deletions knightbus/examples/KnightBus.Examples.PostgreSql/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using KnightBus.Core;
using KnightBus.Core.DependencyInjection;
using KnightBus.Core.Sagas;
using KnightBus.Host;
using KnightBus.Messages;
using KnightBus.PostgreSql;
Expand Down Expand Up @@ -36,6 +37,7 @@ static async Task Main(string[] args)
configuration.ConnectionString = connectionString;
configuration.PollingDelay = TimeSpan.FromMilliseconds(250);
})
.UsePostgresSagaStore()
.RegisterProcessors(typeof(SamplePostgresMessage).Assembly)
//Enable the postgres Transport
.UseTransport<PostgresTransport>();
Expand All @@ -51,9 +53,12 @@ static async Task Main(string[] args)
var client =
(PostgresBus)knightBusHost.Services.CreateScope().ServiceProvider.GetRequiredService<IPostgresBus>();

await client.PublishAsync(new SamplePostgresEvent() { MessageBody = "Yo" }, CancellationToken.None);
// Start the saga
await client.SendAsync(new SamplePostgresSagaStarterCommand(), CancellationToken.None);

await client.PublishAsync(new SamplePostgresEvent { MessageBody = "Yo" }, CancellationToken.None);
var messages = new List<SamplePostgresMessage>();
for (int i = 0; i < 10000; i++)
for (int i = 0; i < 0; i++)
{
messages.Add(new SamplePostgresMessage { MessageBody = i.ToString() });
}
Expand All @@ -67,8 +72,6 @@ static async Task Main(string[] args)
await client.SendAsync(new SamplePoisonPostgresMessage { MessageBody = $"error_{Guid.NewGuid()}" }, default );

Console.ReadKey();

await knightBusHost.StopAsync();
}
}

Expand All @@ -87,6 +90,17 @@ class SamplePoisonPostgresMessage : IPostgresCommand
public required string MessageBody { get; set; }
}


class SamplePostgresSagaStarterCommand : IPostgresCommand
{
public string SagaId => "abe7d7a5b99a475291aa7c7b25589308";
}

class SamplePostgresSagaCommand : IPostgresCommand
{
public string SagaId => "abe7d7a5b99a475291aa7c7b25589308";
}

class SamplePostgresEventMapping : IMessageMapping<SamplePostgresEvent>
{
public string QueueName => "sample_topic";
Expand All @@ -110,6 +124,16 @@ class SamplePoisonPostgresMessageMapping : IMessageMapping<SamplePoisonPostgresM
public string QueueName => "poisoned_postgres_sample_message";
}

class SamplePostgresSagaStarterCommandMapping : IMessageMapping<SamplePostgresSagaStarterCommand>
{
public string QueueName => "sample_postgres_saga_start_command";
}

class SamplePostgresSagaCommandMapping : IMessageMapping<SamplePostgresSagaCommand>
{
public string QueueName => "sample_postgres_saga_command";
}

class PostgresCommandProcessor :
IProcessCommand<SamplePostgresMessage, PostgresProcessingSetting>,
IProcessCommand<SamplePoisonPostgresMessage, PostgresProcessingSetting>,
Expand Down Expand Up @@ -142,3 +166,47 @@ class PostgresProcessingSetting : IProcessingSettings
public TimeSpan MessageLockTimeout => TimeSpan.FromMinutes(5);
public int DeadLetterDeliveryLimit => 2;
}

class PostgresSagaData
{
public int Counter { get; set; }
}

class PostgresSagaProcessor : Saga<PostgresSagaData>,
IProcessCommand<SamplePostgresSagaStarterCommand, PostgresProcessingSetting>,
IProcessCommand<SamplePostgresSagaCommand, PostgresProcessingSetting>
{
public override string PartitionKey => "postgres-saga-processor";
public override TimeSpan TimeToLive => TimeSpan.FromHours(1);

private readonly IPostgresBus _bus;

public PostgresSagaProcessor(IPostgresBus bus)
{
_bus = bus;

MessageMapper.MapStartMessage<SamplePostgresSagaStarterCommand>(m => m.SagaId);
MessageMapper.MapMessage<SamplePostgresSagaCommand>(m => m.SagaId);
}

public async Task ProcessAsync(SamplePostgresSagaStarterCommand message, CancellationToken cancellationToken)
{
await _bus.SendAsync(new SamplePostgresSagaCommand(), cancellationToken);
}

public async Task ProcessAsync(SamplePostgresSagaCommand message, CancellationToken cancellationToken)
{
Data.Counter++;
await UpdateAsync(CancellationToken.None);
Console.WriteLine($"Saga value was {Data.Counter}");
if (Data.Counter < 10)
{
await _bus.SendAsync(new SamplePostgresSagaCommand(), CancellationToken.None);
}
else
{
await CompleteAsync(CancellationToken.None);
Console.WriteLine("Saga completed");
}
}
}

0 comments on commit 6026d6b

Please sign in to comment.