diff --git a/.github/workflows/dotnet-core.yml b/.github/workflows/dotnet-core.yml index cffc94a..1a81fcc 100644 --- a/.github/workflows/dotnet-core.yml +++ b/.github/workflows/dotnet-core.yml @@ -16,7 +16,7 @@ jobs: - name: Setup .NET Core uses: actions/setup-dotnet@v1 with: - dotnet-version: 7.0.x + dotnet-version: 8.0.x - name: Install dependencies run: dotnet restore - name: Build diff --git a/Directory.Build.props b/Directory.Build.props index 3f33599..aed0064 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -5,8 +5,8 @@ - net7.0 - 7.0.0 + net8.0 + 8.0.0 diff --git a/Orleans.Sagas.Samples/Orleans.Sagas.Samples.csproj b/Orleans.Sagas.Samples/Orleans.Sagas.Samples.csproj index bb646c8..976bb97 100644 --- a/Orleans.Sagas.Samples/Orleans.Sagas.Samples.csproj +++ b/Orleans.Sagas.Samples/Orleans.Sagas.Samples.csproj @@ -10,11 +10,11 @@ - + - + diff --git a/Orleans.Sagas.Tests/Orleans.Sagas.Tests.csproj b/Orleans.Sagas.Tests/Orleans.Sagas.Tests.csproj index a5cb7e4..8619095 100644 --- a/Orleans.Sagas.Tests/Orleans.Sagas.Tests.csproj +++ b/Orleans.Sagas.Tests/Orleans.Sagas.Tests.csproj @@ -7,10 +7,10 @@ - - - - + + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/Orleans.Sagas/ActivityDefinition.cs b/Orleans.Sagas/ActivityDefinition.cs index f30be8e..ecd5400 100644 --- a/Orleans.Sagas/ActivityDefinition.cs +++ b/Orleans.Sagas/ActivityDefinition.cs @@ -2,9 +2,13 @@ namespace Orleans.Sagas { + [GenerateSerializer] + [Alias("Orleans.Sagas.ActivityDefinition")] public class ActivityDefinition { + [Id(0)] public Type Type { get; } + [Id(1)] public ISagaPropertyBag Properties { get; } public ActivityDefinition(Type type, ISagaPropertyBag properties) diff --git a/Orleans.Sagas/Orleans.Sagas.csproj b/Orleans.Sagas/Orleans.Sagas.csproj index da06ee4..9794292 100644 --- a/Orleans.Sagas/Orleans.Sagas.csproj +++ b/Orleans.Sagas/Orleans.Sagas.csproj @@ -2,7 +2,7 @@ $(DefaultTargetFrameworkVersion) - 0.0.45-pre + 0.0.46-pre A distributed saga implementation for Orleans diff --git a/Orleans.Sagas/SagaGrain.cs b/Orleans.Sagas/SagaGrain.cs index 767cf2f..a30c4f7 100644 --- a/Orleans.Sagas/SagaGrain.cs +++ b/Orleans.Sagas/SagaGrain.cs @@ -1,361 +1,361 @@ -using Microsoft.Extensions.Logging; -using Orleans.Runtime; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Runtime.Serialization; -using System.Threading.Tasks; - -namespace Orleans.Sagas -{ - public sealed class SagaGrain : Grain, ISagaGrain - { - private static readonly string ReminderName = nameof(SagaGrain); - private readonly IGrainContextAccessor grainContextAccessor; - private readonly IServiceProvider serviceProvider; - private readonly ILogger logger; - private bool isActive; - private IGrainReminder grainReminder; - private IErrorTranslator _errorTranslator; - - public SagaGrain(IGrainContextAccessor grainContextAccessor, IServiceProvider serviceProvider, ILogger logger) - { - this.grainContextAccessor = grainContextAccessor; - this.serviceProvider = serviceProvider; - this.logger = logger; - } - - protected override async Task ReadStateAsync() - { - try - { - await base.ReadStateAsync(); - } - catch (Exception ex) when (ex is SerializationException || ex is InvalidCastException) - { - logger.LogError(1, ex, "Failed to read state"); - await HandleReadingStateError(); - } - } - - private async Task HandleReadingStateError() - { - try - { - await UnRegisterReminderAsync(); - isActive = false; - } - catch (Exception ex) - { - logger.LogError(1, ex, "Failed to handle reading state error"); - } - } - - public async Task RequestAbort() - { - logger.LogWarning(0, $"Saga {this} received an abort request."); - - // register abort request in separate grain in-case storage is mutating. - await GetSagaCancellationGrain().RequestAbort(); - - await ResumeAsync(); - } - - public async Task Execute(IEnumerable activities, ISagaPropertyBag sagaProperties, IErrorTranslator exceptionTranslator) - { - _errorTranslator = exceptionTranslator ?? new DefaultErrorTranslator(); - - if (State.Status == SagaStatus.NotStarted) - { - State.Activities = activities.ToList(); - State.Properties = sagaProperties is null - ? new Dictionary() - : ((SagaPropertyBag)sagaProperties).ContextProperties; - State.Status = SagaStatus.Executing; - await WriteStateAsync(); - await RegisterReminderAsync(); - } - - await ResumeAsync(); - } - - public Task GetStatus() - { - return Task.FromResult(State.Status); - } - - public async Task GetSagaError() - { - if (!State.Properties.ContainsKey(SagaPropertyBagKeys.ActivityErrorPropertyKey)) - { - await Task.CompletedTask; - return null; - } - - return State.Properties[SagaPropertyBagKeys.ActivityErrorPropertyKey]; - } - - public Task HasCompleted() - { - return Task.FromResult( - State.Status == SagaStatus.Aborted || - State.Status == SagaStatus.Compensated || - State.Status == SagaStatus.Executed - ); - } - - public async Task ReceiveReminder(string reminderName, TickStatus status) - { - await TryToInitGrainReminderAsync(); - - await ResumeAsync(); - } - - public Task ResumeAsync() - { - if (!isActive) - { - ResumeNoWaitAsync().Ignore(); - } - return Task.CompletedTask; - } - - public override string ToString() - { - return this.GetPrimaryKey().ToString(); - } - - private ISagaCancellationGrain GetSagaCancellationGrain() - { - return GrainFactory.GetGrain(this.GetPrimaryKey()); - } - - private async Task RegisterReminderAsync() - { - var reminderTime = TimeSpan.FromMinutes(1); - grainReminder = await this.RegisterOrUpdateReminder(ReminderName, reminderTime, reminderTime); - } - - private async Task UnRegisterReminderAsync() - { - await TryToInitGrainReminderAsync(); - - if (grainReminder == null) - { - return; - } - - try - { - await this.UnregisterReminder(grainReminder); - grainReminder = null; - } - catch (Exception ex) - { - logger.LogError(1, ex, "Failed to unregister the reminder"); - } - } - - private async Task ResumeNoWaitAsync() - { - isActive = true; - - try - { - if (State.NumCompletedActivities > 0) - { - await CheckForAbortAsync(); - } - - while (State.Status == SagaStatus.Executing || - State.Status == SagaStatus.Compensating) - { - switch (State.Status) - { - case SagaStatus.Executing: - await ResumeExecuting(); - break; - case SagaStatus.Compensating: - await ResumeCompensating(); - break; - } - } - - switch (State.Status) - { - case SagaStatus.NotStarted: - ResumeNotStarted(); - break; - case SagaStatus.Executed: - case SagaStatus.Compensated: - case SagaStatus.Aborted: - ResumeCompleted(); - break; - } - - await UnRegisterReminderAsync(); - } - finally - { - isActive = false; - } - } - - private void ResumeNotStarted() - { - logger.LogError(0, $"Saga {this} is attempting to resume but was never started."); - } - - private IActivity GetActivity(ActivityDefinition definition) - { - return (IActivity)serviceProvider.GetService(definition.Type); - } - - private async Task ResumeExecuting() - { - while (State.NumCompletedActivities < State.Activities.Count) - { - var definition = State.Activities[State.NumCompletedActivities]; - var currentActivity = GetActivity(definition); - - try - { - logger.LogDebug($"Executing activity #{State.NumCompletedActivities} '{currentActivity.GetType().Name}'..."); - var context = CreateActivityRuntimeContext(definition); - await currentActivity.Execute(context); - logger.LogDebug($"...activity #{State.NumCompletedActivities} '{currentActivity.GetType().Name}' complete."); - State.NumCompletedActivities++; - AddPropertiesToState(context); - await WriteStateAsync(); - } - catch (Exception e) - { - logger.LogWarning(0, "Activity '" + currentActivity.GetType().Name + "' in saga '" + GetType().Name + "' failed with " + e.GetType().Name); - State.CompensationIndex = State.NumCompletedActivities; - State.Status = SagaStatus.Compensating; - AddActivityError(e); - await WriteStateAsync(); - - return; - } - - // To ensure running first activity - if (await CheckForAbortAsync()) - { - return; - } - } - - if (await CheckForAbortAsync()) - { - return; - } - - State.Status = SagaStatus.Executed; - await WriteStateAsync(); - } - - private async Task CheckForAbortAsync() - { - if (await GetSagaCancellationGrain().HasAbortBeenRequested()) - { - if (!State.HasBeenAborted) - { - State.HasBeenAborted = true; - State.Status = SagaStatus.Compensating; - State.CompensationIndex = State.NumCompletedActivities - 1; - await WriteStateAsync(); - } - - return true; - } - - return false; - } - - private async Task ResumeCompensating() - { - while (State.CompensationIndex >= 0) - { - var definition = State.Activities[State.CompensationIndex]; - var currentActivity = GetActivity(definition); - - try - { - logger.LogDebug(0, $"Compensating for activity #{State.CompensationIndex} '{currentActivity.GetType().Name}'..."); - var context = CreateActivityRuntimeContext(definition); - await currentActivity.Compensate(context); - logger.LogDebug(0, $"...activity #{State.CompensationIndex} '{currentActivity.GetType().Name}' compensation complete."); - State.CompensationIndex--; - await WriteStateAsync(); - } - catch (Exception ex) - { - logger.LogWarning(0, "Activity '" + currentActivity.GetType().Name + "' in saga '" + GetType().Name + "' failed while compensating with " + ex.GetType().Name, ex); - await Task.Delay(5000); - // TODO: handle compensation failure with expoential backoff. - // TODO: maybe eventual accept failure in a CompensationFailed state? - } - } - - State.Status = State.HasBeenAborted - ? SagaStatus.Aborted - : SagaStatus.Compensated; - await WriteStateAsync(); - } - - private void AddPropertiesToState(ActivityContext context) - { - var propertyBag = (SagaPropertyBag)context.SagaProperties; - foreach (var property in propertyBag.ContextProperties) - { - State.Properties[property.Key] = property.Value; - } - } - - private ActivityContext CreateActivityRuntimeContext(ActivityDefinition definition) - { - var propertyBag = (SagaPropertyBag)definition.Properties; - IEnumerable> properties = State.Properties; - - if (propertyBag != null) - { - properties = properties.Concat(propertyBag.ContextProperties); - } - - return new ActivityContext( - this.GetPrimaryKey(), - GrainFactory, - grainContextAccessor, - properties.ToDictionary(x => x.Key, y => y.Value) - ); - } - - private void ResumeCompleted() - { - logger.LogInformation($"Saga {this} has completed with status '{State.Status}'."); - } - - private void AddActivityError(Exception exception) - { - try - { - State.Properties[SagaPropertyBagKeys.ActivityErrorPropertyKey] = _errorTranslator?.Translate(exception); - } - catch (Exception ex) - { - logger.LogError(ex, "Failed to tranlsate exception."); - } - } - - private async Task TryToInitGrainReminderAsync() - { - if (grainReminder != null) - { - return; - } - - grainReminder = await this.GetReminder(ReminderName); - } - } -} +using Microsoft.Extensions.Logging; +using Orleans.Runtime; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.Serialization; +using System.Threading.Tasks; + +namespace Orleans.Sagas +{ + public sealed class SagaGrain : Grain, ISagaGrain + { + private static readonly string ReminderName = nameof(SagaGrain); + private readonly IGrainContextAccessor grainContextAccessor; + private readonly IServiceProvider serviceProvider; + private readonly ILogger logger; + private bool isActive; + private IGrainReminder grainReminder; + private IErrorTranslator _errorTranslator; + + public SagaGrain(IGrainContextAccessor grainContextAccessor, IServiceProvider serviceProvider, ILogger logger) + { + this.grainContextAccessor = grainContextAccessor; + this.serviceProvider = serviceProvider; + this.logger = logger; + } + + protected override async Task ReadStateAsync() + { + try + { + await base.ReadStateAsync(); + } + catch (Exception ex) when (ex is SerializationException || ex is InvalidCastException) + { + logger.LogError(1, ex, "Failed to read state"); + await HandleReadingStateError(); + } + } + + private async Task HandleReadingStateError() + { + try + { + await UnRegisterReminderAsync(); + isActive = false; + } + catch (Exception ex) + { + logger.LogError(1, ex, "Failed to handle reading state error"); + } + } + + public async Task RequestAbort() + { + logger.LogWarning(0, $"Saga {this} received an abort request."); + + // register abort request in separate grain in-case storage is mutating. + await GetSagaCancellationGrain().RequestAbort(); + + await ResumeAsync(); + } + + public async Task Execute(IEnumerable activities, ISagaPropertyBag sagaProperties, IErrorTranslator exceptionTranslator) + { + _errorTranslator = exceptionTranslator ?? new DefaultErrorTranslator(); + + if (State.Status == SagaStatus.NotStarted) + { + State.Activities = activities.ToList(); + State.Properties = sagaProperties is null + ? new Dictionary() + : ((SagaPropertyBag)sagaProperties).ContextProperties; + State.Status = SagaStatus.Executing; + await WriteStateAsync(); + await RegisterReminderAsync(); + } + + await ResumeAsync(); + } + + public Task GetStatus() + { + return Task.FromResult(State.Status); + } + + public async Task GetSagaError() + { + if (!State.Properties.ContainsKey(SagaPropertyBagKeys.ActivityErrorPropertyKey)) + { + await Task.CompletedTask; + return null; + } + + return State.Properties[SagaPropertyBagKeys.ActivityErrorPropertyKey]; + } + + public Task HasCompleted() + { + return Task.FromResult( + State.Status == SagaStatus.Aborted || + State.Status == SagaStatus.Compensated || + State.Status == SagaStatus.Executed + ); + } + + public async Task ReceiveReminder(string reminderName, TickStatus status) + { + await TryToInitGrainReminderAsync(); + + await ResumeAsync(); + } + + public Task ResumeAsync() + { + if (!isActive) + { + ResumeNoWaitAsync().Ignore(); + } + return Task.CompletedTask; + } + + public override string ToString() + { + return this.GetPrimaryKey().ToString(); + } + + private ISagaCancellationGrain GetSagaCancellationGrain() + { + return GrainFactory.GetGrain(this.GetPrimaryKey()); + } + + private async Task RegisterReminderAsync() + { + var reminderTime = TimeSpan.FromMinutes(1); + grainReminder = await this.RegisterOrUpdateReminder(ReminderName, reminderTime, reminderTime); + } + + private async Task UnRegisterReminderAsync() + { + await TryToInitGrainReminderAsync(); + + if (grainReminder == null) + { + return; + } + + try + { + await this.UnregisterReminder(grainReminder); + grainReminder = null; + } + catch (Exception ex) + { + logger.LogError(1, ex, "Failed to unregister the reminder"); + } + } + + private async Task ResumeNoWaitAsync() + { + isActive = true; + + try + { + if (State.NumCompletedActivities > 0) + { + await CheckForAbortAsync(); + } + + while (State.Status == SagaStatus.Executing || + State.Status == SagaStatus.Compensating) + { + switch (State.Status) + { + case SagaStatus.Executing: + await ResumeExecuting(); + break; + case SagaStatus.Compensating: + await ResumeCompensating(); + break; + } + } + + switch (State.Status) + { + case SagaStatus.NotStarted: + ResumeNotStarted(); + break; + case SagaStatus.Executed: + case SagaStatus.Compensated: + case SagaStatus.Aborted: + ResumeCompleted(); + break; + } + + await UnRegisterReminderAsync(); + } + finally + { + isActive = false; + } + } + + private void ResumeNotStarted() + { + logger.LogError(0, $"Saga {this} is attempting to resume but was never started."); + } + + private IActivity GetActivity(ActivityDefinition definition) + { + return (IActivity)serviceProvider.GetService(definition.Type); + } + + private async Task ResumeExecuting() + { + while (State.NumCompletedActivities < State.Activities.Count) + { + var definition = State.Activities[State.NumCompletedActivities]; + var currentActivity = GetActivity(definition); + + try + { + logger.LogDebug($"Executing activity #{State.NumCompletedActivities} '{currentActivity.GetType().Name}'..."); + var context = CreateActivityRuntimeContext(definition); + await currentActivity.Execute(context); + logger.LogDebug($"...activity #{State.NumCompletedActivities} '{currentActivity.GetType().Name}' complete."); + State.NumCompletedActivities++; + AddPropertiesToState(context); + await WriteStateAsync(); + } + catch (Exception e) + { + logger.LogWarning(0, "Activity '" + currentActivity.GetType().Name + "' in saga '" + GetType().Name + "' failed with " + e.GetType().Name); + State.CompensationIndex = State.NumCompletedActivities; + State.Status = SagaStatus.Compensating; + AddActivityError(e); + await WriteStateAsync(); + + return; + } + + // To ensure running first activity + if (await CheckForAbortAsync()) + { + return; + } + } + + if (await CheckForAbortAsync()) + { + return; + } + + State.Status = SagaStatus.Executed; + await WriteStateAsync(); + } + + private async Task CheckForAbortAsync() + { + if (await GetSagaCancellationGrain().HasAbortBeenRequested()) + { + if (!State.HasBeenAborted) + { + State.HasBeenAborted = true; + State.Status = SagaStatus.Compensating; + State.CompensationIndex = State.NumCompletedActivities - 1; + await WriteStateAsync(); + } + + return true; + } + + return false; + } + + private async Task ResumeCompensating() + { + while (State.CompensationIndex >= 0) + { + var definition = State.Activities[State.CompensationIndex]; + var currentActivity = GetActivity(definition); + + try + { + logger.LogDebug(0, $"Compensating for activity #{State.CompensationIndex} '{currentActivity.GetType().Name}'..."); + var context = CreateActivityRuntimeContext(definition); + await currentActivity.Compensate(context); + logger.LogDebug(0, $"...activity #{State.CompensationIndex} '{currentActivity.GetType().Name}' compensation complete."); + State.CompensationIndex--; + await WriteStateAsync(); + } + catch (Exception ex) + { + logger.LogWarning(0, "Activity '" + currentActivity.GetType().Name + "' in saga '" + GetType().Name + "' failed while compensating with " + ex.GetType().Name, ex); + await Task.Delay(5000); + // TODO: handle compensation failure with expoential backoff. + // TODO: maybe eventual accept failure in a CompensationFailed state? + } + } + + State.Status = State.HasBeenAborted + ? SagaStatus.Aborted + : SagaStatus.Compensated; + await WriteStateAsync(); + } + + private void AddPropertiesToState(ActivityContext context) + { + var propertyBag = (SagaPropertyBag)context.SagaProperties; + foreach (var property in propertyBag.ContextProperties) + { + State.Properties[property.Key] = property.Value; + } + } + + private ActivityContext CreateActivityRuntimeContext(ActivityDefinition definition) + { + var propertyBag = (SagaPropertyBag)definition.Properties; + IEnumerable> properties = State.Properties; + + if (propertyBag != null) + { + properties = properties.Concat(propertyBag.ContextProperties); + } + + return new ActivityContext( + this.GetPrimaryKey(), + GrainFactory, + grainContextAccessor, + properties.ToDictionary(x => x.Key, y => y.Value) + ); + } + + private void ResumeCompleted() + { + logger.LogInformation($"Saga {this} has completed with status '{State.Status}'."); + } + + private void AddActivityError(Exception exception) + { + try + { + State.Properties[SagaPropertyBagKeys.ActivityErrorPropertyKey] = _errorTranslator?.Translate(exception); + } + catch (Exception ex) + { + logger.LogError(ex, "Failed to tranlsate exception."); + } + } + + private async Task TryToInitGrainReminderAsync() + { + if (grainReminder != null) + { + return; + } + + grainReminder = await this.GetReminder(ReminderName); + } + } +} diff --git a/Orleans.Sagas/SagaPropertyBag.cs b/Orleans.Sagas/SagaPropertyBag.cs index 55a299d..e74c39e 100644 --- a/Orleans.Sagas/SagaPropertyBag.cs +++ b/Orleans.Sagas/SagaPropertyBag.cs @@ -3,10 +3,14 @@ namespace Orleans.Sagas { + [GenerateSerializer] + [Alias("Orleans.Sagas.SagaPropertyBag")] class SagaPropertyBag : ISagaPropertyBag { + [Id(0)] private readonly Dictionary existingProperties; + [Id(1)] public Dictionary ContextProperties { get; } public SagaPropertyBag() : this(new Dictionary())