diff --git a/.github/workflows/dotnet-core.yml b/.github/workflows/dotnet-core.yml
index cffc94a..1a81fcc 100644
--- a/.github/workflows/dotnet-core.yml
+++ b/.github/workflows/dotnet-core.yml
@@ -16,7 +16,7 @@ jobs:
- name: Setup .NET Core
uses: actions/setup-dotnet@v1
with:
- dotnet-version: 7.0.x
+ dotnet-version: 8.0.x
- name: Install dependencies
run: dotnet restore
- name: Build
diff --git a/Directory.Build.props b/Directory.Build.props
index 3f33599..aed0064 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -5,8 +5,8 @@
- net7.0
- 7.0.0
+ net8.0
+ 8.0.0
diff --git a/Orleans.Sagas.Samples/Orleans.Sagas.Samples.csproj b/Orleans.Sagas.Samples/Orleans.Sagas.Samples.csproj
index bb646c8..976bb97 100644
--- a/Orleans.Sagas.Samples/Orleans.Sagas.Samples.csproj
+++ b/Orleans.Sagas.Samples/Orleans.Sagas.Samples.csproj
@@ -10,11 +10,11 @@
-
+
-
+
diff --git a/Orleans.Sagas.Tests/Orleans.Sagas.Tests.csproj b/Orleans.Sagas.Tests/Orleans.Sagas.Tests.csproj
index a5cb7e4..8619095 100644
--- a/Orleans.Sagas.Tests/Orleans.Sagas.Tests.csproj
+++ b/Orleans.Sagas.Tests/Orleans.Sagas.Tests.csproj
@@ -7,10 +7,10 @@
-
-
-
-
+
+
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/Orleans.Sagas/ActivityDefinition.cs b/Orleans.Sagas/ActivityDefinition.cs
index f30be8e..ecd5400 100644
--- a/Orleans.Sagas/ActivityDefinition.cs
+++ b/Orleans.Sagas/ActivityDefinition.cs
@@ -2,9 +2,13 @@
namespace Orleans.Sagas
{
+ [GenerateSerializer]
+ [Alias("Orleans.Sagas.ActivityDefinition")]
public class ActivityDefinition
{
+ [Id(0)]
public Type Type { get; }
+ [Id(1)]
public ISagaPropertyBag Properties { get; }
public ActivityDefinition(Type type, ISagaPropertyBag properties)
diff --git a/Orleans.Sagas/Orleans.Sagas.csproj b/Orleans.Sagas/Orleans.Sagas.csproj
index da06ee4..9794292 100644
--- a/Orleans.Sagas/Orleans.Sagas.csproj
+++ b/Orleans.Sagas/Orleans.Sagas.csproj
@@ -2,7 +2,7 @@
$(DefaultTargetFrameworkVersion)
- 0.0.45-pre
+ 0.0.46-pre
A distributed saga implementation for Orleans
diff --git a/Orleans.Sagas/SagaGrain.cs b/Orleans.Sagas/SagaGrain.cs
index 767cf2f..a30c4f7 100644
--- a/Orleans.Sagas/SagaGrain.cs
+++ b/Orleans.Sagas/SagaGrain.cs
@@ -1,361 +1,361 @@
-using Microsoft.Extensions.Logging;
-using Orleans.Runtime;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Runtime.Serialization;
-using System.Threading.Tasks;
-
-namespace Orleans.Sagas
-{
- public sealed class SagaGrain : Grain, ISagaGrain
- {
- private static readonly string ReminderName = nameof(SagaGrain);
- private readonly IGrainContextAccessor grainContextAccessor;
- private readonly IServiceProvider serviceProvider;
- private readonly ILogger logger;
- private bool isActive;
- private IGrainReminder grainReminder;
- private IErrorTranslator _errorTranslator;
-
- public SagaGrain(IGrainContextAccessor grainContextAccessor, IServiceProvider serviceProvider, ILogger logger)
- {
- this.grainContextAccessor = grainContextAccessor;
- this.serviceProvider = serviceProvider;
- this.logger = logger;
- }
-
- protected override async Task ReadStateAsync()
- {
- try
- {
- await base.ReadStateAsync();
- }
- catch (Exception ex) when (ex is SerializationException || ex is InvalidCastException)
- {
- logger.LogError(1, ex, "Failed to read state");
- await HandleReadingStateError();
- }
- }
-
- private async Task HandleReadingStateError()
- {
- try
- {
- await UnRegisterReminderAsync();
- isActive = false;
- }
- catch (Exception ex)
- {
- logger.LogError(1, ex, "Failed to handle reading state error");
- }
- }
-
- public async Task RequestAbort()
- {
- logger.LogWarning(0, $"Saga {this} received an abort request.");
-
- // register abort request in separate grain in-case storage is mutating.
- await GetSagaCancellationGrain().RequestAbort();
-
- await ResumeAsync();
- }
-
- public async Task Execute(IEnumerable activities, ISagaPropertyBag sagaProperties, IErrorTranslator exceptionTranslator)
- {
- _errorTranslator = exceptionTranslator ?? new DefaultErrorTranslator();
-
- if (State.Status == SagaStatus.NotStarted)
- {
- State.Activities = activities.ToList();
- State.Properties = sagaProperties is null
- ? new Dictionary()
- : ((SagaPropertyBag)sagaProperties).ContextProperties;
- State.Status = SagaStatus.Executing;
- await WriteStateAsync();
- await RegisterReminderAsync();
- }
-
- await ResumeAsync();
- }
-
- public Task GetStatus()
- {
- return Task.FromResult(State.Status);
- }
-
- public async Task GetSagaError()
- {
- if (!State.Properties.ContainsKey(SagaPropertyBagKeys.ActivityErrorPropertyKey))
- {
- await Task.CompletedTask;
- return null;
- }
-
- return State.Properties[SagaPropertyBagKeys.ActivityErrorPropertyKey];
- }
-
- public Task HasCompleted()
- {
- return Task.FromResult(
- State.Status == SagaStatus.Aborted ||
- State.Status == SagaStatus.Compensated ||
- State.Status == SagaStatus.Executed
- );
- }
-
- public async Task ReceiveReminder(string reminderName, TickStatus status)
- {
- await TryToInitGrainReminderAsync();
-
- await ResumeAsync();
- }
-
- public Task ResumeAsync()
- {
- if (!isActive)
- {
- ResumeNoWaitAsync().Ignore();
- }
- return Task.CompletedTask;
- }
-
- public override string ToString()
- {
- return this.GetPrimaryKey().ToString();
- }
-
- private ISagaCancellationGrain GetSagaCancellationGrain()
- {
- return GrainFactory.GetGrain(this.GetPrimaryKey());
- }
-
- private async Task RegisterReminderAsync()
- {
- var reminderTime = TimeSpan.FromMinutes(1);
- grainReminder = await this.RegisterOrUpdateReminder(ReminderName, reminderTime, reminderTime);
- }
-
- private async Task UnRegisterReminderAsync()
- {
- await TryToInitGrainReminderAsync();
-
- if (grainReminder == null)
- {
- return;
- }
-
- try
- {
- await this.UnregisterReminder(grainReminder);
- grainReminder = null;
- }
- catch (Exception ex)
- {
- logger.LogError(1, ex, "Failed to unregister the reminder");
- }
- }
-
- private async Task ResumeNoWaitAsync()
- {
- isActive = true;
-
- try
- {
- if (State.NumCompletedActivities > 0)
- {
- await CheckForAbortAsync();
- }
-
- while (State.Status == SagaStatus.Executing ||
- State.Status == SagaStatus.Compensating)
- {
- switch (State.Status)
- {
- case SagaStatus.Executing:
- await ResumeExecuting();
- break;
- case SagaStatus.Compensating:
- await ResumeCompensating();
- break;
- }
- }
-
- switch (State.Status)
- {
- case SagaStatus.NotStarted:
- ResumeNotStarted();
- break;
- case SagaStatus.Executed:
- case SagaStatus.Compensated:
- case SagaStatus.Aborted:
- ResumeCompleted();
- break;
- }
-
- await UnRegisterReminderAsync();
- }
- finally
- {
- isActive = false;
- }
- }
-
- private void ResumeNotStarted()
- {
- logger.LogError(0, $"Saga {this} is attempting to resume but was never started.");
- }
-
- private IActivity GetActivity(ActivityDefinition definition)
- {
- return (IActivity)serviceProvider.GetService(definition.Type);
- }
-
- private async Task ResumeExecuting()
- {
- while (State.NumCompletedActivities < State.Activities.Count)
- {
- var definition = State.Activities[State.NumCompletedActivities];
- var currentActivity = GetActivity(definition);
-
- try
- {
- logger.LogDebug($"Executing activity #{State.NumCompletedActivities} '{currentActivity.GetType().Name}'...");
- var context = CreateActivityRuntimeContext(definition);
- await currentActivity.Execute(context);
- logger.LogDebug($"...activity #{State.NumCompletedActivities} '{currentActivity.GetType().Name}' complete.");
- State.NumCompletedActivities++;
- AddPropertiesToState(context);
- await WriteStateAsync();
- }
- catch (Exception e)
- {
- logger.LogWarning(0, "Activity '" + currentActivity.GetType().Name + "' in saga '" + GetType().Name + "' failed with " + e.GetType().Name);
- State.CompensationIndex = State.NumCompletedActivities;
- State.Status = SagaStatus.Compensating;
- AddActivityError(e);
- await WriteStateAsync();
-
- return;
- }
-
- // To ensure running first activity
- if (await CheckForAbortAsync())
- {
- return;
- }
- }
-
- if (await CheckForAbortAsync())
- {
- return;
- }
-
- State.Status = SagaStatus.Executed;
- await WriteStateAsync();
- }
-
- private async Task CheckForAbortAsync()
- {
- if (await GetSagaCancellationGrain().HasAbortBeenRequested())
- {
- if (!State.HasBeenAborted)
- {
- State.HasBeenAborted = true;
- State.Status = SagaStatus.Compensating;
- State.CompensationIndex = State.NumCompletedActivities - 1;
- await WriteStateAsync();
- }
-
- return true;
- }
-
- return false;
- }
-
- private async Task ResumeCompensating()
- {
- while (State.CompensationIndex >= 0)
- {
- var definition = State.Activities[State.CompensationIndex];
- var currentActivity = GetActivity(definition);
-
- try
- {
- logger.LogDebug(0, $"Compensating for activity #{State.CompensationIndex} '{currentActivity.GetType().Name}'...");
- var context = CreateActivityRuntimeContext(definition);
- await currentActivity.Compensate(context);
- logger.LogDebug(0, $"...activity #{State.CompensationIndex} '{currentActivity.GetType().Name}' compensation complete.");
- State.CompensationIndex--;
- await WriteStateAsync();
- }
- catch (Exception ex)
- {
- logger.LogWarning(0, "Activity '" + currentActivity.GetType().Name + "' in saga '" + GetType().Name + "' failed while compensating with " + ex.GetType().Name, ex);
- await Task.Delay(5000);
- // TODO: handle compensation failure with expoential backoff.
- // TODO: maybe eventual accept failure in a CompensationFailed state?
- }
- }
-
- State.Status = State.HasBeenAborted
- ? SagaStatus.Aborted
- : SagaStatus.Compensated;
- await WriteStateAsync();
- }
-
- private void AddPropertiesToState(ActivityContext context)
- {
- var propertyBag = (SagaPropertyBag)context.SagaProperties;
- foreach (var property in propertyBag.ContextProperties)
- {
- State.Properties[property.Key] = property.Value;
- }
- }
-
- private ActivityContext CreateActivityRuntimeContext(ActivityDefinition definition)
- {
- var propertyBag = (SagaPropertyBag)definition.Properties;
- IEnumerable> properties = State.Properties;
-
- if (propertyBag != null)
- {
- properties = properties.Concat(propertyBag.ContextProperties);
- }
-
- return new ActivityContext(
- this.GetPrimaryKey(),
- GrainFactory,
- grainContextAccessor,
- properties.ToDictionary(x => x.Key, y => y.Value)
- );
- }
-
- private void ResumeCompleted()
- {
- logger.LogInformation($"Saga {this} has completed with status '{State.Status}'.");
- }
-
- private void AddActivityError(Exception exception)
- {
- try
- {
- State.Properties[SagaPropertyBagKeys.ActivityErrorPropertyKey] = _errorTranslator?.Translate(exception);
- }
- catch (Exception ex)
- {
- logger.LogError(ex, "Failed to tranlsate exception.");
- }
- }
-
- private async Task TryToInitGrainReminderAsync()
- {
- if (grainReminder != null)
- {
- return;
- }
-
- grainReminder = await this.GetReminder(ReminderName);
- }
- }
-}
+using Microsoft.Extensions.Logging;
+using Orleans.Runtime;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.Serialization;
+using System.Threading.Tasks;
+
+namespace Orleans.Sagas
+{
+ public sealed class SagaGrain : Grain, ISagaGrain
+ {
+ private static readonly string ReminderName = nameof(SagaGrain);
+ private readonly IGrainContextAccessor grainContextAccessor;
+ private readonly IServiceProvider serviceProvider;
+ private readonly ILogger logger;
+ private bool isActive;
+ private IGrainReminder grainReminder;
+ private IErrorTranslator _errorTranslator;
+
+ public SagaGrain(IGrainContextAccessor grainContextAccessor, IServiceProvider serviceProvider, ILogger logger)
+ {
+ this.grainContextAccessor = grainContextAccessor;
+ this.serviceProvider = serviceProvider;
+ this.logger = logger;
+ }
+
+ protected override async Task ReadStateAsync()
+ {
+ try
+ {
+ await base.ReadStateAsync();
+ }
+ catch (Exception ex) when (ex is SerializationException || ex is InvalidCastException)
+ {
+ logger.LogError(1, ex, "Failed to read state");
+ await HandleReadingStateError();
+ }
+ }
+
+ private async Task HandleReadingStateError()
+ {
+ try
+ {
+ await UnRegisterReminderAsync();
+ isActive = false;
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(1, ex, "Failed to handle reading state error");
+ }
+ }
+
+ public async Task RequestAbort()
+ {
+ logger.LogWarning(0, $"Saga {this} received an abort request.");
+
+ // register abort request in separate grain in-case storage is mutating.
+ await GetSagaCancellationGrain().RequestAbort();
+
+ await ResumeAsync();
+ }
+
+ public async Task Execute(IEnumerable activities, ISagaPropertyBag sagaProperties, IErrorTranslator exceptionTranslator)
+ {
+ _errorTranslator = exceptionTranslator ?? new DefaultErrorTranslator();
+
+ if (State.Status == SagaStatus.NotStarted)
+ {
+ State.Activities = activities.ToList();
+ State.Properties = sagaProperties is null
+ ? new Dictionary()
+ : ((SagaPropertyBag)sagaProperties).ContextProperties;
+ State.Status = SagaStatus.Executing;
+ await WriteStateAsync();
+ await RegisterReminderAsync();
+ }
+
+ await ResumeAsync();
+ }
+
+ public Task GetStatus()
+ {
+ return Task.FromResult(State.Status);
+ }
+
+ public async Task GetSagaError()
+ {
+ if (!State.Properties.ContainsKey(SagaPropertyBagKeys.ActivityErrorPropertyKey))
+ {
+ await Task.CompletedTask;
+ return null;
+ }
+
+ return State.Properties[SagaPropertyBagKeys.ActivityErrorPropertyKey];
+ }
+
+ public Task HasCompleted()
+ {
+ return Task.FromResult(
+ State.Status == SagaStatus.Aborted ||
+ State.Status == SagaStatus.Compensated ||
+ State.Status == SagaStatus.Executed
+ );
+ }
+
+ public async Task ReceiveReminder(string reminderName, TickStatus status)
+ {
+ await TryToInitGrainReminderAsync();
+
+ await ResumeAsync();
+ }
+
+ public Task ResumeAsync()
+ {
+ if (!isActive)
+ {
+ ResumeNoWaitAsync().Ignore();
+ }
+ return Task.CompletedTask;
+ }
+
+ public override string ToString()
+ {
+ return this.GetPrimaryKey().ToString();
+ }
+
+ private ISagaCancellationGrain GetSagaCancellationGrain()
+ {
+ return GrainFactory.GetGrain(this.GetPrimaryKey());
+ }
+
+ private async Task RegisterReminderAsync()
+ {
+ var reminderTime = TimeSpan.FromMinutes(1);
+ grainReminder = await this.RegisterOrUpdateReminder(ReminderName, reminderTime, reminderTime);
+ }
+
+ private async Task UnRegisterReminderAsync()
+ {
+ await TryToInitGrainReminderAsync();
+
+ if (grainReminder == null)
+ {
+ return;
+ }
+
+ try
+ {
+ await this.UnregisterReminder(grainReminder);
+ grainReminder = null;
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(1, ex, "Failed to unregister the reminder");
+ }
+ }
+
+ private async Task ResumeNoWaitAsync()
+ {
+ isActive = true;
+
+ try
+ {
+ if (State.NumCompletedActivities > 0)
+ {
+ await CheckForAbortAsync();
+ }
+
+ while (State.Status == SagaStatus.Executing ||
+ State.Status == SagaStatus.Compensating)
+ {
+ switch (State.Status)
+ {
+ case SagaStatus.Executing:
+ await ResumeExecuting();
+ break;
+ case SagaStatus.Compensating:
+ await ResumeCompensating();
+ break;
+ }
+ }
+
+ switch (State.Status)
+ {
+ case SagaStatus.NotStarted:
+ ResumeNotStarted();
+ break;
+ case SagaStatus.Executed:
+ case SagaStatus.Compensated:
+ case SagaStatus.Aborted:
+ ResumeCompleted();
+ break;
+ }
+
+ await UnRegisterReminderAsync();
+ }
+ finally
+ {
+ isActive = false;
+ }
+ }
+
+ private void ResumeNotStarted()
+ {
+ logger.LogError(0, $"Saga {this} is attempting to resume but was never started.");
+ }
+
+ private IActivity GetActivity(ActivityDefinition definition)
+ {
+ return (IActivity)serviceProvider.GetService(definition.Type);
+ }
+
+ private async Task ResumeExecuting()
+ {
+ while (State.NumCompletedActivities < State.Activities.Count)
+ {
+ var definition = State.Activities[State.NumCompletedActivities];
+ var currentActivity = GetActivity(definition);
+
+ try
+ {
+ logger.LogDebug($"Executing activity #{State.NumCompletedActivities} '{currentActivity.GetType().Name}'...");
+ var context = CreateActivityRuntimeContext(definition);
+ await currentActivity.Execute(context);
+ logger.LogDebug($"...activity #{State.NumCompletedActivities} '{currentActivity.GetType().Name}' complete.");
+ State.NumCompletedActivities++;
+ AddPropertiesToState(context);
+ await WriteStateAsync();
+ }
+ catch (Exception e)
+ {
+ logger.LogWarning(0, "Activity '" + currentActivity.GetType().Name + "' in saga '" + GetType().Name + "' failed with " + e.GetType().Name);
+ State.CompensationIndex = State.NumCompletedActivities;
+ State.Status = SagaStatus.Compensating;
+ AddActivityError(e);
+ await WriteStateAsync();
+
+ return;
+ }
+
+ // To ensure running first activity
+ if (await CheckForAbortAsync())
+ {
+ return;
+ }
+ }
+
+ if (await CheckForAbortAsync())
+ {
+ return;
+ }
+
+ State.Status = SagaStatus.Executed;
+ await WriteStateAsync();
+ }
+
+ private async Task CheckForAbortAsync()
+ {
+ if (await GetSagaCancellationGrain().HasAbortBeenRequested())
+ {
+ if (!State.HasBeenAborted)
+ {
+ State.HasBeenAborted = true;
+ State.Status = SagaStatus.Compensating;
+ State.CompensationIndex = State.NumCompletedActivities - 1;
+ await WriteStateAsync();
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ private async Task ResumeCompensating()
+ {
+ while (State.CompensationIndex >= 0)
+ {
+ var definition = State.Activities[State.CompensationIndex];
+ var currentActivity = GetActivity(definition);
+
+ try
+ {
+ logger.LogDebug(0, $"Compensating for activity #{State.CompensationIndex} '{currentActivity.GetType().Name}'...");
+ var context = CreateActivityRuntimeContext(definition);
+ await currentActivity.Compensate(context);
+ logger.LogDebug(0, $"...activity #{State.CompensationIndex} '{currentActivity.GetType().Name}' compensation complete.");
+ State.CompensationIndex--;
+ await WriteStateAsync();
+ }
+ catch (Exception ex)
+ {
+ logger.LogWarning(0, "Activity '" + currentActivity.GetType().Name + "' in saga '" + GetType().Name + "' failed while compensating with " + ex.GetType().Name, ex);
+ await Task.Delay(5000);
+ // TODO: handle compensation failure with expoential backoff.
+ // TODO: maybe eventual accept failure in a CompensationFailed state?
+ }
+ }
+
+ State.Status = State.HasBeenAborted
+ ? SagaStatus.Aborted
+ : SagaStatus.Compensated;
+ await WriteStateAsync();
+ }
+
+ private void AddPropertiesToState(ActivityContext context)
+ {
+ var propertyBag = (SagaPropertyBag)context.SagaProperties;
+ foreach (var property in propertyBag.ContextProperties)
+ {
+ State.Properties[property.Key] = property.Value;
+ }
+ }
+
+ private ActivityContext CreateActivityRuntimeContext(ActivityDefinition definition)
+ {
+ var propertyBag = (SagaPropertyBag)definition.Properties;
+ IEnumerable> properties = State.Properties;
+
+ if (propertyBag != null)
+ {
+ properties = properties.Concat(propertyBag.ContextProperties);
+ }
+
+ return new ActivityContext(
+ this.GetPrimaryKey(),
+ GrainFactory,
+ grainContextAccessor,
+ properties.ToDictionary(x => x.Key, y => y.Value)
+ );
+ }
+
+ private void ResumeCompleted()
+ {
+ logger.LogInformation($"Saga {this} has completed with status '{State.Status}'.");
+ }
+
+ private void AddActivityError(Exception exception)
+ {
+ try
+ {
+ State.Properties[SagaPropertyBagKeys.ActivityErrorPropertyKey] = _errorTranslator?.Translate(exception);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Failed to tranlsate exception.");
+ }
+ }
+
+ private async Task TryToInitGrainReminderAsync()
+ {
+ if (grainReminder != null)
+ {
+ return;
+ }
+
+ grainReminder = await this.GetReminder(ReminderName);
+ }
+ }
+}
diff --git a/Orleans.Sagas/SagaPropertyBag.cs b/Orleans.Sagas/SagaPropertyBag.cs
index 55a299d..e74c39e 100644
--- a/Orleans.Sagas/SagaPropertyBag.cs
+++ b/Orleans.Sagas/SagaPropertyBag.cs
@@ -3,10 +3,14 @@
namespace Orleans.Sagas
{
+ [GenerateSerializer]
+ [Alias("Orleans.Sagas.SagaPropertyBag")]
class SagaPropertyBag : ISagaPropertyBag
{
+ [Id(0)]
private readonly Dictionary existingProperties;
+ [Id(1)]
public Dictionary ContextProperties { get; }
public SagaPropertyBag() : this(new Dictionary())