diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props
index e8836010a05b..24dee365ad2d 100644
--- a/dotnet/Directory.Packages.props
+++ b/dotnet/Directory.Packages.props
@@ -12,6 +12,7 @@
+
@@ -47,6 +48,7 @@
+
diff --git a/dotnet/SK-dotnet.sln b/dotnet/SK-dotnet.sln
index 0844db359552..a5ff8dedaf4b 100644
--- a/dotnet/SK-dotnet.sln
+++ b/dotnet/SK-dotnet.sln
@@ -437,6 +437,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "sk-chatgpt-azure-function",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kernel-functions-generator", "samples\Demos\CreateChatGptPlugin\MathPlugin\kernel-functions-generator\kernel-functions-generator.csproj", "{78785CB1-66CF-4895-D7E5-A440DD84BE86}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ProcessWithCloudEvents", "samples\Demos\ProcessWithCloudEvents\ProcessWithCloudEvents.csproj", "{065E7F63-3475-4EEE-93EE-D2A4BF7AA538}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -1164,6 +1166,18 @@ Global
{78785CB1-66CF-4895-D7E5-A440DD84BE86}.Publish|Any CPU.Build.0 = Debug|Any CPU
{78785CB1-66CF-4895-D7E5-A440DD84BE86}.Release|Any CPU.ActiveCfg = Release|Any CPU
{78785CB1-66CF-4895-D7E5-A440DD84BE86}.Release|Any CPU.Build.0 = Release|Any CPU
+ {065E7F63-3475-4EEE-93EE-D2A4BF7AA538}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {065E7F63-3475-4EEE-93EE-D2A4BF7AA538}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {065E7F63-3475-4EEE-93EE-D2A4BF7AA538}.Publish|Any CPU.ActiveCfg = Debug|Any CPU
+ {065E7F63-3475-4EEE-93EE-D2A4BF7AA538}.Publish|Any CPU.Build.0 = Debug|Any CPU
+ {065E7F63-3475-4EEE-93EE-D2A4BF7AA538}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {065E7F63-3475-4EEE-93EE-D2A4BF7AA538}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D1786E2B-CAA0-4B2D-A974-9845EB9E420F}.Debug|Any CPU.ActiveCfg = Debug
+ {D1786E2B-CAA0-4B2D-A974-9845EB9E420F}.Debug|Any CPU.Build.0 = Debug
+ {D1786E2B-CAA0-4B2D-A974-9845EB9E420F}.Publish|Any CPU.ActiveCfg = Debug
+ {D1786E2B-CAA0-4B2D-A974-9845EB9E420F}.Publish|Any CPU.Build.0 = Debug
+ {D1786E2B-CAA0-4B2D-A974-9845EB9E420F}.Release|Any CPU.ActiveCfg = Release
+ {D1786E2B-CAA0-4B2D-A974-9845EB9E420F}.Release|Any CPU.Build.0 = Release
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -1324,6 +1338,7 @@ Global
{02EA681E-C7D8-13C7-8484-4AC65E1B71E8} = {5D4C0700-BBB5-418F-A7B2-F392B9A18263}
{2EB6E4C2-606D-B638-2E08-49EA2061C428} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
{78785CB1-66CF-4895-D7E5-A440DD84BE86} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
+ {065E7F63-3475-4EEE-93EE-D2A4BF7AA538} = {5D4C0700-BBB5-418F-A7B2-F392B9A18263}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {FBDC56A3-86AD-4323-AA0F-201E59123B83}
diff --git a/dotnet/dapr.yaml b/dotnet/dapr.yaml
new file mode 100644
index 000000000000..8522f60e7b75
--- /dev/null
+++ b/dotnet/dapr.yaml
@@ -0,0 +1,11 @@
+apps:
+- appDirPath: samples\Demos\ProcessWithDapr
+ appID: processwithdapr
+ appPort: 58641
+ httpPort: 5000
+ command:
+ - dotnet
+ - run
+ resourcesPaths:
+ - ./components
+version: 1
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/AppConfig.cs b/dotnet/samples/Demos/ProcessWithCloudEvents/AppConfig.cs
new file mode 100644
index 000000000000..d9d980ce5075
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/AppConfig.cs
@@ -0,0 +1,58 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+internal sealed class AppConfig
+{
+ ///
+ /// The configuration for the Azure EntraId authentication.
+ ///
+ public AzureEntraIdConfig? AzureEntraId { get; set; }
+
+ ///
+ /// Ensures that the configuration is valid.
+ ///
+ internal void Validate()
+ {
+ ArgumentNullException.ThrowIfNull(this.AzureEntraId?.ClientId, nameof(this.AzureEntraId.ClientId));
+ ArgumentNullException.ThrowIfNull(this.AzureEntraId?.TenantId, nameof(this.AzureEntraId.TenantId));
+
+ if (this.AzureEntraId.InteractiveBrowserAuthentication)
+ {
+ ArgumentNullException.ThrowIfNull(this.AzureEntraId.InteractiveBrowserRedirectUri, nameof(this.AzureEntraId.InteractiveBrowserRedirectUri));
+ }
+ else
+ {
+ ArgumentNullException.ThrowIfNull(this.AzureEntraId?.ClientSecret, nameof(this.AzureEntraId.ClientSecret));
+ }
+ }
+
+ internal sealed class AzureEntraIdConfig
+ {
+ ///
+ /// App Registration Client Id
+ ///
+ public string? ClientId { get; set; }
+
+ ///
+ /// App Registration Tenant Id
+ ///
+ public string? TenantId { get; set; }
+
+ ///
+ /// The client secret to use for the Azure EntraId authentication.
+ ///
+ ///
+ /// This is required if InteractiveBrowserAuthentication is false. (App Authentication)
+ ///
+ public string? ClientSecret { get; set; }
+
+ ///
+ /// Specifies whether to use interactive browser authentication (Delegated User Authentication) or App authentication.
+ ///
+ public bool InteractiveBrowserAuthentication { get; set; }
+
+ ///
+ /// When using interactive browser authentication, the redirect URI to use.
+ ///
+ public string? InteractiveBrowserRedirectUri { get; set; } = "http://localhost";
+ }
+}
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/Controllers/CounterBaseController.cs b/dotnet/samples/Demos/ProcessWithCloudEvents/Controllers/CounterBaseController.cs
new file mode 100644
index 000000000000..98ef47862db0
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/Controllers/CounterBaseController.cs
@@ -0,0 +1,135 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using System.Text.Json;
+using Microsoft.AspNetCore.Mvc;
+using Microsoft.Graph;
+using Microsoft.SemanticKernel;
+using Microsoft.SemanticKernel.Process.Models;
+using ProcessWithCloudEvents.Processes;
+using ProcessWithCloudEvents.Processes.Steps;
+
+namespace ProcessWithCloudEvents.Controllers;
+///
+/// Base class that contains common methods to be used when using SK Processes and Counter common api entrypoints
+///
+public abstract class CounterBaseController : ControllerBase
+{
+ ///
+ /// Kernel to be used to run the SK Process
+ ///
+ internal Kernel Kernel { get; init; }
+
+ ///
+ /// SK Process to be used to hold the counter logic
+ ///
+ internal KernelProcess Process { get; init; }
+
+ private static readonly JsonSerializerOptions s_jsonOptions = new()
+ {
+ WriteIndented = true,
+ DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull
+ };
+
+ internal Kernel BuildKernel(GraphServiceClient? graphClient = null)
+ {
+ var builder = Kernel.CreateBuilder();
+ if (graphClient != null)
+ {
+ builder.Services.AddSingleton(graphClient);
+ }
+ return builder.Build();
+ }
+
+ internal KernelProcess InitializeProcess(ProcessBuilder process)
+ {
+ this.InitializeStateFile(process.Name);
+ var processState = this.LoadProcessState(process.Name);
+ return process.Build(processState);
+ }
+
+ private string GetTemporaryProcessFilePath(string processName)
+ {
+ return Path.Combine(Path.GetTempPath(), $"{processName}.json");
+ }
+
+ internal void InitializeStateFile(string processName)
+ {
+ // Initialize the path for the temporary file
+ var tempProcessFile = this.GetTemporaryProcessFilePath(processName);
+
+ // If the file does not exist, create it and initialize with zero
+ if (!System.IO.File.Exists(tempProcessFile))
+ {
+ System.IO.File.WriteAllText(tempProcessFile, "");
+ }
+ }
+
+ internal void SaveProcessState(string processName, KernelProcessStateMetadata processStateInfo)
+ {
+ var content = JsonSerializer.Serialize(processStateInfo, s_jsonOptions);
+ System.IO.File.WriteAllText(this.GetTemporaryProcessFilePath(processName), content);
+ }
+
+ internal KernelProcessStateMetadata? LoadProcessState(string processName)
+ {
+ try
+ {
+ using StreamReader reader = new(this.GetTemporaryProcessFilePath(processName));
+ var content = reader.ReadToEnd();
+ return JsonSerializer.Deserialize(content, s_jsonOptions);
+ }
+ catch (Exception)
+ {
+ return null;
+ }
+ }
+
+ internal void StoreProcessState(KernelProcess process)
+ {
+ var stateMetadata = process.ToProcessStateMetadata();
+ this.SaveProcessState(process.State.Name, stateMetadata);
+ }
+
+ internal KernelProcessStepState? GetCounterState(KernelProcess process)
+ {
+ // TODO: Replace when there is a better way of extracting snapshot of local state
+ return process.Steps
+ .First(step => step.State.Name == RequestCounterProcess.StepNames.Counter).State as KernelProcessStepState;
+ }
+
+ internal async Task StartProcessWithEventAsync(string eventName, object? eventData = null)
+ {
+ var runningProcess = await this.Process.StartAsync(this.Kernel, new() { Id = eventName, Data = eventData });
+ var processState = await runningProcess.GetStateAsync();
+ this.StoreProcessState(processState);
+
+ return processState;
+ }
+
+ ///
+ /// API entry point to increase the counter
+ ///
+ /// current counter value
+ public virtual async Task IncreaseCounterAsync()
+ {
+ return await Task.FromResult(0);
+ }
+
+ ///
+ /// API entry point to decrease the counter
+ ///
+ /// current counter value
+ public virtual async Task DecreaseCounterAsync()
+ {
+ return await Task.FromResult(0);
+ }
+
+ ///
+ /// API entry point to reset counter value to 0
+ ///
+ /// current counter value
+ public virtual async Task ResetCounterAsync()
+ {
+ return await Task.FromResult(0);
+ }
+}
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/Controllers/CounterWithCloudStepsController.cs b/dotnet/samples/Demos/ProcessWithCloudEvents/Controllers/CounterWithCloudStepsController.cs
new file mode 100644
index 000000000000..9b069851180e
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/Controllers/CounterWithCloudStepsController.cs
@@ -0,0 +1,54 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Microsoft.AspNetCore.Mvc;
+using Microsoft.Graph;
+using ProcessWithCloudEvents.Processes;
+
+namespace ProcessWithCloudEvents.Controllers;
+[ApiController]
+[Route("[controller]")]
+public class CounterWithCloudStepsController : CounterBaseController
+{
+ private readonly ILogger _logger;
+
+ public CounterWithCloudStepsController(ILogger logger, GraphServiceClient graphClient)
+ {
+ this._logger = logger;
+
+ this.Kernel = this.BuildKernel(graphClient);
+ this.Process = this.InitializeProcess(RequestCounterProcess.CreateProcessWithCloudSteps());
+ }
+
+ ///
+ [HttpGet("increase", Name = "IncreaseWithCloudSteps")]
+ public override async Task IncreaseCounterAsync()
+ {
+ var eventName = RequestCounterProcess.GetEventName(RequestCounterProcess.CounterProcessEvents.IncreaseCounterRequest);
+ var runningProcess = await this.StartProcessWithEventAsync(eventName);
+ var counterState = this.GetCounterState(runningProcess);
+
+ return counterState?.State?.Counter ?? -1;
+ }
+
+ ///
+ [HttpGet("decrease", Name = "DecreaseWithCloudSteps")]
+ public override async Task DecreaseCounterAsync()
+ {
+ var eventName = RequestCounterProcess.GetEventName(RequestCounterProcess.CounterProcessEvents.DecreaseCounterRequest);
+ var runningProcess = await this.StartProcessWithEventAsync(eventName);
+ var counterState = this.GetCounterState(runningProcess);
+
+ return counterState?.State?.Counter ?? -1;
+ }
+
+ ///
+ [HttpGet("reset", Name = "ResetCounterWithCloudSteps")]
+ public override async Task ResetCounterAsync()
+ {
+ var eventName = RequestCounterProcess.GetEventName(RequestCounterProcess.CounterProcessEvents.ResetCounterRequest);
+ var runningProcess = await this.StartProcessWithEventAsync(eventName);
+ var counterState = this.GetCounterState(runningProcess);
+
+ return counterState?.State?.Counter ?? -1;
+ }
+}
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/Controllers/CounterWithCloudSubscribersController.cs b/dotnet/samples/Demos/ProcessWithCloudEvents/Controllers/CounterWithCloudSubscribersController.cs
new file mode 100644
index 000000000000..bc57705e0a34
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/Controllers/CounterWithCloudSubscribersController.cs
@@ -0,0 +1,57 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Microsoft.AspNetCore.Mvc;
+using Microsoft.Graph;
+using ProcessWithCloudEvents.Processes;
+
+namespace ProcessWithCloudEvents.Controllers;
+[ApiController]
+[Route("[controller]")]
+public class CounterWithCloudSubscribersController : CounterBaseController
+{
+ private readonly ILogger _logger;
+
+ public CounterWithCloudSubscribersController(ILogger logger, GraphServiceClient graphClient)
+ {
+ this._logger = logger;
+ this.Kernel = this.BuildKernel();
+
+ var serviceProvider = new ServiceCollection()
+ .AddSingleton(graphClient)
+ .BuildServiceProvider();
+ this.Process = this.InitializeProcess(RequestCounterProcess.CreateProcessWithProcessSubscriber(serviceProvider));
+ }
+
+ ///
+ [HttpGet("increase", Name = "IncreaseCounterWithCloudSubscribers")]
+ public override async Task IncreaseCounterAsync()
+ {
+ var eventName = RequestCounterProcess.GetEventName(RequestCounterProcess.CounterProcessEvents.IncreaseCounterRequest);
+ var runningProcess = await this.StartProcessWithEventAsync(eventName);
+ var counterState = this.GetCounterState(runningProcess);
+
+ return counterState?.State?.Counter ?? -1;
+ }
+
+ ///
+ [HttpGet("decrease", Name = "DecreaseCounterWithCloudSubscribers")]
+ public override async Task DecreaseCounterAsync()
+ {
+ var eventName = RequestCounterProcess.GetEventName(RequestCounterProcess.CounterProcessEvents.DecreaseCounterRequest);
+ var runningProcess = await this.StartProcessWithEventAsync(eventName);
+ var counterState = this.GetCounterState(runningProcess);
+
+ return counterState?.State?.Counter ?? -1;
+ }
+
+ ///
+ [HttpGet("reset", Name = "ResetCounterWithCloudSubscribers")]
+ public override async Task ResetCounterAsync()
+ {
+ var eventName = RequestCounterProcess.GetEventName(RequestCounterProcess.CounterProcessEvents.ResetCounterRequest);
+ var runningProcess = await this.StartProcessWithEventAsync(eventName);
+ var counterState = this.GetCounterState(runningProcess);
+
+ return counterState?.State?.Counter ?? -1;
+ }
+}
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/GraphServiceProvider.cs b/dotnet/samples/Demos/ProcessWithCloudEvents/GraphServiceProvider.cs
new file mode 100644
index 000000000000..470352b928b6
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/GraphServiceProvider.cs
@@ -0,0 +1,51 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Azure.Core;
+using Azure.Identity;
+using Microsoft.Graph;
+
+public static class GraphServiceProvider
+{
+ public static GraphServiceClient CreateGraphService()
+ {
+ string[] scopes;
+
+ var config = new ConfigurationBuilder()
+ .SetBasePath(Directory.GetCurrentDirectory()) // Set the base path for appsettings.json
+ .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) // Load appsettings.json
+ .AddUserSecrets()
+ .AddEnvironmentVariables()
+ .Build()
+ .Get() ??
+ throw new InvalidOperationException("Configuration is not setup correctly.");
+
+ config.Validate();
+
+ TokenCredential credential = null!;
+ if (config.AzureEntraId!.InteractiveBrowserAuthentication) // Authentication As User
+ {
+ /// Use this if using user delegated permissions
+ scopes = ["User.Read", "Mail.Send"];
+
+ credential = new InteractiveBrowserCredential(
+ new InteractiveBrowserCredentialOptions
+ {
+ TenantId = config.AzureEntraId.TenantId,
+ ClientId = config.AzureEntraId.ClientId,
+ AuthorityHost = AzureAuthorityHosts.AzurePublicCloud,
+ RedirectUri = new Uri(config.AzureEntraId.InteractiveBrowserRedirectUri!)
+ });
+ }
+ else // Authentication As Application
+ {
+ scopes = ["https://graph.microsoft.com/.default"];
+
+ credential = new ClientSecretCredential(
+ config.AzureEntraId.TenantId,
+ config.AzureEntraId.ClientId,
+ config.AzureEntraId.ClientSecret);
+ }
+
+ return new GraphServiceClient(credential, scopes);
+ }
+}
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/MicrosoftGraph/GraphRequestFactory.cs b/dotnet/samples/Demos/ProcessWithCloudEvents/MicrosoftGraph/GraphRequestFactory.cs
new file mode 100644
index 000000000000..f7253d3e2833
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/MicrosoftGraph/GraphRequestFactory.cs
@@ -0,0 +1,39 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Microsoft.Graph.Me.SendMail;
+using Microsoft.Graph.Models;
+
+namespace ProcessWithCloudEvents.MicrosoftGraph;
+
+///
+/// Factory that creates Microsoft Graph related objects
+///
+public static class GraphRequestFactory
+{
+ ///
+ /// Method that creates MailPost Body with defined subject, content and recipients
+ ///
+ /// subject of the email
+ /// content of the email
+ /// recipients of the email
+ ///
+ public static SendMailPostRequestBody CreateEmailBody(string subject, string content, List recipients)
+ {
+ var message = new SendMailPostRequestBody()
+ {
+ Message = new Microsoft.Graph.Models.Message()
+ {
+ Subject = subject,
+ Body = new()
+ {
+ ContentType = Microsoft.Graph.Models.BodyType.Text,
+ Content = content,
+ },
+ ToRecipients = recipients.Select(address => new Recipient { EmailAddress = new() { Address = address } }).ToList(),
+ },
+ SaveToSentItems = true,
+ };
+
+ return message;
+ }
+}
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.csproj b/dotnet/samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.csproj
new file mode 100644
index 000000000000..0d8d4711ef5b
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.csproj
@@ -0,0 +1,25 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+ $(NoWarn);CA2007,CA1861,CA1050,VSTHRD111,SKEXP0001,SKEXP0010,SKEXP0020,SKEXP0040,SKEXP0050,SKEXP0060,SKEXP0070,SKEXP0080,SKEXP0110
+
+ 5ee045b0-aea3-4f08-8d31-32d1a6f8fed0
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.http b/dotnet/samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.http
new file mode 100644
index 000000000000..7111d4bcb145
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.http
@@ -0,0 +1,24 @@
+@ProcessWithCloudEvents_HostAddress = http://localhost:5077
+
+GET {{ProcessWithCloudEvents_HostAddress}}/CounterWithCloudSteps/increase
+Accept: application/json
+
+###
+GET {{ProcessWithCloudEvents_HostAddress}}/CounterWithCloudSteps/decrease
+Accept: application/json
+
+###
+GET {{ProcessWithCloudEvents_HostAddress}}/CounterWithCloudSteps/reset
+Accept: application/json
+
+###
+GET {{ProcessWithCloudEvents_HostAddress}}/CounterWithCloudSubscribers/increase
+Accept: application/json
+
+###
+GET {{ProcessWithCloudEvents_HostAddress}}/CounterWithCloudSubscribers/decrease
+Accept: application/json
+
+###
+GET {{ProcessWithCloudEvents_HostAddress}}/CounterWithCloudSubscribers/reset
+Accept: application/json
\ No newline at end of file
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/RequestCounterProcess.cs b/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/RequestCounterProcess.cs
new file mode 100644
index 000000000000..3ef4ed655de0
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/RequestCounterProcess.cs
@@ -0,0 +1,169 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Microsoft.Graph;
+using Microsoft.Graph.Me.SendMail;
+using Microsoft.SemanticKernel;
+using Microsoft.SemanticKernel.Process;
+using ProcessWithCloudEvents.MicrosoftGraph;
+using ProcessWithCloudEvents.Processes.Steps;
+
+namespace ProcessWithCloudEvents.Processes;
+
+public static class RequestCounterProcess
+{
+ public static class StepNames
+ {
+ public const string Counter = nameof(Counter);
+ public const string CounterInterceptor = nameof(CounterInterceptor);
+ public const string SendEmail = nameof(SendEmail);
+ }
+
+ public enum CounterProcessEvents
+ {
+ IncreaseCounterRequest,
+ DecreaseCounterRequest,
+ ResetCounterRequest,
+ OnCounterReset,
+ OnCounterResult
+ }
+
+ public static string GetEventName(CounterProcessEvents processEvent)
+ {
+ return Enum.GetName(processEvent) ?? "";
+ }
+
+ public static ProcessBuilder CreateProcessWithCloudSteps()
+ {
+ var processBuilder = new ProcessBuilder("RequestCounterProcess");
+
+ var counterStep = processBuilder.AddStepFromType(StepNames.Counter);
+ var counterInterceptorStep = processBuilder.AddStepFromType(StepNames.CounterInterceptor);
+ var emailSenderStep = processBuilder.AddStepFromType(StepNames.SendEmail);
+
+ processBuilder
+ .OnInputEvent(CounterProcessEvents.IncreaseCounterRequest)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterStep, functionName: CounterStep.StepFunctions.IncreaseCounter));
+
+ processBuilder
+ .OnInputEvent(CounterProcessEvents.DecreaseCounterRequest)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterStep, functionName: CounterStep.StepFunctions.DecreaseCounter));
+
+ processBuilder
+ .OnInputEvent(CounterProcessEvents.ResetCounterRequest)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterStep, functionName: CounterStep.StepFunctions.ResetCounter));
+
+ counterStep
+ .OnFunctionResult(CounterStep.StepFunctions.IncreaseCounter)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterInterceptorStep));
+
+ counterStep
+ .OnFunctionResult(CounterStep.StepFunctions.DecreaseCounter)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterInterceptorStep));
+
+ counterStep
+ .OnFunctionResult(CounterStep.StepFunctions.ResetCounter)
+ .SendEventTo(new ProcessFunctionTargetBuilder(emailSenderStep, SendEmailStep.StepFunctions.SendCounterResetEmail));
+
+ counterInterceptorStep
+ .OnFunctionResult(CounterInterceptorStep.StepFunctions.InterceptCounter)
+ .SendEventTo(new ProcessFunctionTargetBuilder(emailSenderStep, SendEmailStep.StepFunctions.SendCounterChangeEmail));
+
+ return processBuilder;
+ }
+
+ public static ProcessBuilder CreateProcessWithProcessSubscriber(IServiceProvider serviceProvider)
+ {
+ var processBuilder = new ProcessBuilder("CounterWithProcessSubscriber");
+
+ var counterStep = processBuilder.AddStepFromType(StepNames.Counter);
+ var counterInterceptorStep = processBuilder.AddStepFromType(StepNames.CounterInterceptor);
+
+ processBuilder
+ .OnInputEvent(CounterProcessEvents.IncreaseCounterRequest)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterStep, functionName: CounterStep.StepFunctions.IncreaseCounter));
+
+ processBuilder
+ .OnInputEvent(CounterProcessEvents.DecreaseCounterRequest)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterStep, functionName: CounterStep.StepFunctions.DecreaseCounter));
+
+ processBuilder
+ .OnInputEvent(CounterProcessEvents.ResetCounterRequest)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterStep, functionName: CounterStep.StepFunctions.ResetCounter));
+
+ counterStep
+ .OnFunctionResult(CounterStep.StepFunctions.IncreaseCounter)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterInterceptorStep));
+
+ counterStep
+ .OnFunctionResult(CounterStep.StepFunctions.DecreaseCounter)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterInterceptorStep));
+
+ counterStep
+ .OnFunctionResult(CounterStep.StepFunctions.ResetCounter)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(CounterProcessEvents.OnCounterReset))
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterInterceptorStep));
+
+ counterInterceptorStep
+ .OnFunctionResult(CounterInterceptorStep.StepFunctions.InterceptCounter)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(CounterProcessEvents.OnCounterResult));
+
+ processBuilder.LinkEventSubscribersFromType(serviceProvider);
+
+ return processBuilder;
+ }
+
+ public class CounterProcessSubscriber : KernelProcessEventsSubscriber
+ {
+ private SendMailPostRequestBody GenerateEmailRequest(int counter, string emailAddress, string subject)
+ {
+ var message = GraphRequestFactory.CreateEmailBody(
+ subject: $"{subject} - using SK event subscribers",
+ content: $"The counter is {counter}",
+ recipients: [emailAddress]);
+
+ return message;
+ }
+
+ [ProcessEventSubscriber(CounterProcessEvents.OnCounterResult)]
+ public async Task OnCounterResultReceivedAsync(int? counterResult)
+ {
+ if (!counterResult.HasValue)
+ {
+ return;
+ }
+
+ try
+ {
+ var graphClient = this.ServiceProvider?.GetRequiredService();
+ var user = await graphClient?.Me.GetAsync();
+ var graphEmailMessage = this.GenerateEmailRequest(counterResult.Value, user!.Mail!, subject: "The counter has changed");
+ await graphClient?.Me.SendMail.PostAsync(graphEmailMessage);
+ }
+ catch (Exception e)
+ {
+ throw new KernelException($"Something went wrong and couldn't send email - {e}");
+ }
+ }
+
+ [ProcessEventSubscriber(CounterProcessEvents.OnCounterReset)]
+ public async Task OnCounterResetReceivedAsync(int? counterResult)
+ {
+ if (!counterResult.HasValue)
+ {
+ return;
+ }
+
+ try
+ {
+ var graphClient = this.ServiceProvider?.GetRequiredService();
+ var user = await graphClient.Me.GetAsync();
+ var graphEmailMessage = this.GenerateEmailRequest(counterResult.Value, user!.Mail!, subject: "The counter has been reset");
+ await graphClient?.Me.SendMail.PostAsync(graphEmailMessage);
+ }
+ catch (Exception e)
+ {
+ throw new KernelException($"Something went wrong and couldn't send email - {e}");
+ }
+ }
+ }
+}
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/Steps/CounterInterceptorStep.cs b/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/Steps/CounterInterceptorStep.cs
new file mode 100644
index 000000000000..827b346f9232
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/Steps/CounterInterceptorStep.cs
@@ -0,0 +1,26 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Microsoft.SemanticKernel;
+
+namespace ProcessWithCloudEvents.Processes.Steps;
+
+public class CounterInterceptorStep : KernelProcessStep
+{
+ public static class StepFunctions
+ {
+ public const string InterceptCounter = nameof(InterceptCounter);
+ }
+
+ [KernelFunction(StepFunctions.InterceptCounter)]
+ public int? InterceptCounter(int counterStatus)
+ {
+ var multipleOf = 3;
+ if (counterStatus != 0 && counterStatus % multipleOf == 0)
+ {
+ // Only return counter if counter is a multiple of "multipleOf"
+ return counterStatus;
+ }
+
+ return null;
+ }
+}
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/Steps/CounterStep.cs b/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/Steps/CounterStep.cs
new file mode 100644
index 000000000000..48738c0bfed4
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/Steps/CounterStep.cs
@@ -0,0 +1,71 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Microsoft.SemanticKernel;
+
+namespace ProcessWithCloudEvents.Processes.Steps;
+
+public class CounterStep : KernelProcessStep
+{
+ public static class StepFunctions
+ {
+ public const string IncreaseCounter = nameof(IncreaseCounter);
+ public const string DecreaseCounter = nameof(DecreaseCounter);
+ public const string ResetCounter = nameof(ResetCounter);
+ }
+
+ public static class OutputEvents
+ {
+ public const string CounterResult = nameof(CounterResult);
+ }
+
+ internal CounterStepState? _state;
+
+ public override ValueTask ActivateAsync(KernelProcessStepState state)
+ {
+ this._state = state.State;
+ return ValueTask.CompletedTask;
+ }
+
+ [KernelFunction(StepFunctions.IncreaseCounter)]
+ public async Task IncreaseCounterAsync(KernelProcessStepContext context)
+ {
+ this._state!.Counter += this._state.CounterIncrements;
+
+ if (this._state!.Counter > 5)
+ {
+ await context.EmitEventAsync(OutputEvents.CounterResult, this._state.Counter);
+ }
+ this._state.LastCounterUpdate = DateTime.UtcNow;
+
+ return this._state.Counter;
+ }
+
+ [KernelFunction(StepFunctions.DecreaseCounter)]
+ public async Task DecreaseCounterAsync(KernelProcessStepContext context)
+ {
+ this._state!.Counter -= this._state.CounterIncrements;
+
+ if (this._state!.Counter > 5)
+ {
+ await context.EmitEventAsync(OutputEvents.CounterResult, this._state.Counter);
+ }
+ this._state.LastCounterUpdate = DateTime.UtcNow;
+
+ return this._state.Counter;
+ }
+
+ [KernelFunction(StepFunctions.ResetCounter)]
+ public async Task ResetCounterAsync(KernelProcessStepContext context)
+ {
+ this._state!.Counter = 0;
+ return this._state.Counter;
+ }
+}
+
+public class CounterStepState
+{
+ public int Counter { get; set; } = 0;
+ public int CounterIncrements { get; set; } = 1;
+
+ public DateTime? LastCounterUpdate { get; set; } = null;
+}
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/Steps/SendEmailStep.cs b/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/Steps/SendEmailStep.cs
new file mode 100644
index 000000000000..92fc6244c925
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/Processes/Steps/SendEmailStep.cs
@@ -0,0 +1,83 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Microsoft.Graph;
+using Microsoft.Graph.Me.SendMail;
+using Microsoft.SemanticKernel;
+using ProcessWithCloudEvents.MicrosoftGraph;
+
+namespace ProcessWithCloudEvents.Processes.Steps;
+
+public class SendEmailStep : KernelProcessStep
+{
+ public static class OutputEvents
+ {
+ public const string SendEmailSuccess = nameof(SendEmailSuccess);
+ public const string SendEmailFailure = nameof(SendEmailFailure);
+ }
+
+ public static class StepFunctions
+ {
+ public const string SendCounterChangeEmail = nameof(SendCounterChangeEmail);
+ public const string SendCounterResetEmail = nameof(SendCounterResetEmail);
+ }
+
+ public SendEmailStep() { }
+
+ protected SendMailPostRequestBody PopulateMicrosoftGraphMailMessage(object inputData, string emailAddress, string subject)
+ {
+ var message = GraphRequestFactory.CreateEmailBody(
+ subject: $"{subject} - using SK cloud step",
+ content: $"The counter is {(int)inputData}",
+ recipients: [emailAddress]);
+
+ return message;
+ }
+
+ [KernelFunction(StepFunctions.SendCounterChangeEmail)]
+ public async Task PublishCounterChangedEmailMessageAsync(KernelProcessStepContext context, Kernel kernel, object inputData)
+ {
+ if (inputData == null)
+ {
+ return;
+ }
+
+ try
+ {
+ var graphClient = kernel.GetRequiredService();
+ var user = await graphClient.Me.GetAsync();
+ var graphEmailMessage = this.PopulateMicrosoftGraphMailMessage(inputData, user!.Mail!, subject: "The counter has changed");
+ await graphClient.Me.SendMail.PostAsync(graphEmailMessage).ConfigureAwait(false);
+
+ await context.EmitEventAsync(OutputEvents.SendEmailSuccess);
+ }
+ catch (Exception e)
+ {
+ await context.EmitEventAsync(OutputEvents.SendEmailFailure, e, visibility: KernelProcessEventVisibility.Public);
+ throw new KernelException($"Something went wrong and couldn't send email - {e}");
+ }
+ }
+
+ [KernelFunction(StepFunctions.SendCounterResetEmail)]
+ public async Task PublishCounterResetEmailMessageAsync(KernelProcessStepContext context, Kernel kernel, object inputData)
+ {
+ if (inputData == null)
+ {
+ return;
+ }
+
+ try
+ {
+ var graphClient = kernel.GetRequiredService();
+ var user = await graphClient.Me.GetAsync();
+ var graphEmailMessage = this.PopulateMicrosoftGraphMailMessage(inputData, user!.Mail!, subject: "The counter has been reset");
+ await graphClient.Me.SendMail.PostAsync(graphEmailMessage).ConfigureAwait(false);
+
+ await context.EmitEventAsync(OutputEvents.SendEmailSuccess);
+ }
+ catch (Exception e)
+ {
+ await context.EmitEventAsync(OutputEvents.SendEmailFailure, e, visibility: KernelProcessEventVisibility.Public);
+ throw new KernelException($"Something went wrong and couldn't send email - {e}");
+ }
+ }
+}
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/Program.cs b/dotnet/samples/Demos/ProcessWithCloudEvents/Program.cs
new file mode 100644
index 000000000000..dae96b88b210
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/Program.cs
@@ -0,0 +1,34 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Microsoft.Graph;
+using ProcessWithCloudEvents.Controllers;
+
+var builder = WebApplication.CreateBuilder(args);
+builder.Services.AddSingleton(GraphServiceProvider.CreateGraphService());
+
+// For demo purposes making the Counter a singleton so it is not instantiated on every new request
+builder.Services.AddSingleton();
+builder.Services.AddSingleton();
+
+// Add services to the container.
+builder.Services.AddControllers();
+// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
+builder.Services.AddEndpointsApiExplorer();
+builder.Services.AddSwaggerGen();
+
+var app = builder.Build();
+
+// Configure the HTTP request pipeline.
+if (app.Environment.IsDevelopment())
+{
+ app.UseSwagger();
+ app.UseSwaggerUI();
+}
+
+app.UseHttpsRedirection();
+
+app.UseAuthorization();
+
+app.MapControllers();
+
+app.Run();
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/README.md b/dotnet/samples/Demos/ProcessWithCloudEvents/README.md
new file mode 100644
index 000000000000..59c8fcf6bd8f
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/README.md
@@ -0,0 +1,251 @@
+# Process With Cloud Events Demo
+
+This demo contains an ASP.NET core API that showcases the use of cloud events using SK Processes Steps and SK Process with Event Subscribers.
+
+
+For more information about Semantic Kernel Processes, see the following documentation:
+
+## Semantic Kernel Processes
+
+- [Overview of the Process Framework (docs)](https://learn.microsoft.com/semantic-kernel/frameworks/process/process-framework)
+- [Getting Started with Processes (samples)](../../GettingStartedWithProcesses/)
+
+## Demo
+
+### Process: Counter with Cloud Events
+
+#### Steps
+
+##### Counter Step
+
+A simple counter has 3 main functionalities:
+
+- Increase count
+- Decrease count
+- Reset count (set counter to 0)
+
+To achieve this behavior the SK Stateful Step `Processes/Steps/CounterStep.cs` was created.
+On every request it stores that state that can be used to restore the state on the next request.
+
+##### Counter Interceptor Step
+
+This step works as a filter that only passes the counter value if it is a multiple of `multipleOf` else passes a null value.
+
+##### Send Email Step
+
+This step sends an email if receiving a not nullable int value to the same email used on log in.
+
+#### Processes
+
+##### Process With Cloud Steps
+
+```mermaid
+flowchart LR
+ subgraph API
+ ApiIncrease["/increase"]
+ ApiDecrease["/decrease"]
+ ApiReset["/reset"]
+ end
+
+ subgraph process[SK Process]
+ direction LR
+ subgraph counter[Counter Step]
+ increaseCounter[IncreaseCounterAsync
Function]
+ decreaseCounter[DecreaseCounterAsync
Function]
+ resetCounter[ResetCounterAsync
Function]
+ end
+
+ counterInterceptor[Counter
Interceptor
Step]
+
+ subgraph sendEmail[Send Email Step]
+ sendCounterChangedEmail[PublishCounterChangedEmailMessageAsync
Function]
+ sendResetEmail[PublishCounterResetEmailMessageAsync
Function]
+ end
+
+ increaseCounter--> counterInterceptor
+ decreaseCounter--> counterInterceptor
+
+ counterInterceptor-->sendCounterChangedEmail
+ resetCounter-->sendResetEmail
+ end
+
+ ApiIncrease<-->|IncreaseCounterRequest|increaseCounter
+ ApiDecrease<-->|DecreaseCounterRequest|decreaseCounter
+ ApiReset<-->|ResetCounterRequest|resetCounter
+```
+
+Cloud events related logic is encapsulated in a step.
+
+**Breakdown**
+
+- When building the process Kernel used in the SK Process, the cloud event client has to be passed to the Kernel.
+
+- When using `Microsoft Graph`, after completing the [Microsoft Graph Setup](./#microsoft-graph-setup), To achieve the proper setup the following is needed:
+
+ 1. The specific service (`GraphServiceClient` in this case) needs to be added to the Services that are used by the kernel of the process:
+
+ ```C#
+ internal Kernel BuildKernel(GraphServiceClient? graphClient = null)
+ {
+ var builder = Kernel.CreateBuilder();
+ if (graphClient != null)
+ {
+ builder.Services.AddSingleton(graphClient);
+ }
+ return builder.Build();
+ }
+ ```
+ 2. Since now all steps have access to the configured kernel, inside a step, it now can make use of the service by doing:
+ ```C#
+ var graphClient = kernel.GetRequiredService();
+ ```
+
+##### Process With Cloud Process Subscribers
+
+Cloud events related logic is encapsulated in SK Event Subscribers.
+
+```mermaid
+flowchart LR
+ subgraph API
+ ApiIncrease["/increase"]
+ ApiDecrease["/decrease"]
+ ApiReset["/reset"]
+ end
+
+ subgraph process[SK Process - CreateProcessWithProcessSubscriber]
+ direction TB
+ subgraph counter[Counter Step]
+ increaseCounter[IncreaseCounterAsync
Function]
+ decreaseCounter[DecreaseCounterAsync
Function]
+ resetCounter[ResetCounterAsync
Function]
+ end
+ counterInterceptor[Counter
Interceptor
Step]
+
+ increaseCounter--> counterInterceptor
+ decreaseCounter--> counterInterceptor
+ end
+
+ subgraph processInterceptor[SK Process Subscribers - CounterProcessSubscriber]
+ OnCounterResultReceivedAsync
+ OnCounterResetReceivedAsync
+ end
+
+ counterInterceptor-->|OnCounterResult|OnCounterResultReceivedAsync
+ resetCounter-->|OnCounterReset|OnCounterResetReceivedAsync
+
+ ApiIncrease<-->|IncreaseCounterRequest|increaseCounter
+ ApiDecrease<-->|DecreaseCounterRequest|decreaseCounter
+ ApiReset<-->|ResetCounterRequest|resetCounter
+```
+**Breakdown**
+
+- When building the process Kernel used in the SK Process, the cloud event client has to be passed to the Event Subscribers.
+
+- When using `Microsoft Graph`, after completing the [Microsoft Graph Setup](./#microsoft-graph-setup), the Event Subscribers can be linked by doing:
+ 1. Creating an enum that contains the process events of interest.
+ ```C#
+ public enum CounterProcessEvents
+ {
+ IncreaseCounterRequest,
+ DecreaseCounterRequest,
+ ResetCounterRequest,
+ OnCounterReset,
+ OnCounterResult
+ }
+ ```
+ 2. On the existing process, adding which events can be accessed externally using `EmitAsProcessEvent()` and input events can be subscribed to with `OnInputEvent()`:
+ ```C#
+ var processBuilder = new ProcessBuilder("CounterWithProcessSubscriber");
+
+ ...
+
+ processBuilder
+ .OnInputEvent(CounterProcessEvents.IncreaseCounterRequest)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterStep, functionName: CounterStep.StepFunctions.IncreaseCounter));
+
+ processBuilder
+ .OnInputEvent(CounterProcessEvents.DecreaseCounterRequest)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterStep, functionName: CounterStep.StepFunctions.DecreaseCounter));
+
+ processBuilder
+ .OnInputEvent(CounterProcessEvents.ResetCounterRequest)
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterStep, functionName: CounterStep.StepFunctions.ResetCounter));
+
+ ...
+
+ counterStep
+ .OnFunctionResult(CounterStep.StepFunctions.ResetCounter)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(CounterProcessEvents.OnCounterReset))
+ .SendEventTo(new ProcessFunctionTargetBuilder(counterInterceptorStep));
+
+ counterInterceptorStep
+ .OnFunctionResult(CounterInterceptorStep.StepFunctions.InterceptCounter)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(CounterProcessEvents.OnCounterResult));
+ ```
+ 3. Create a `KernelProcessEventsSubscriber` based class that with the `ProcessEventSubscriber` attributes to link specific process events to specific methods to execute.
+ ```C#
+ public class CounterProcessSubscriber : KernelProcessEventsSubscriber
+ {
+ [ProcessEventSubscriber(CounterProcessEvents.OnCounterReset)]
+ public async Task OnCounterResetReceivedAsync(int? counterResult)
+ {
+ if (!counterResult.HasValue)
+ {
+ return;
+ }
+
+ try
+ {
+ var graphClient = this.ServiceProvider?.GetRequiredService();
+ var user = await graphClient.Me.GetAsync();
+ var graphEmailMessage = this.GenerateEmailRequest(counterResult.Value, user!.Mail!, subject: "The counter has been reset");
+ await graphClient?.Me.SendMail.PostAsync(graphEmailMessage);
+ }
+ catch (Exception e)
+ {
+ throw new KernelException($"Something went wrong and couldn't send email - {e}");
+ }
+ }
+ }
+ ```
+ 4. Link the `KernelProcessEventsSubscriber` based class (example: `CounterProcessSubscriber`) to the process builder.
+ ```C#
+ processBuilder.LinkEventSubscribersFromType(serviceProvider);
+ ```
+
+### Setup
+
+#### Microsoft Graph Setup
+
+##### Create an App Registration in Azure Active Directory
+
+1. Go to the [Azure Portal](https://portal.azure.com/).
+2. Select the Azure Active Directory service.
+3. Select App registrations and click on New registration.
+4. Fill in the required fields and click on Register.
+5. Copy the Application **(client) Id** for later use.
+6. Save Directory **(tenant) Id** for later use..
+7. Click on Certificates & secrets and create a new client secret. (Any name and expiration date will work)
+8. Copy the **client secret** value for later use.
+9. Click on API permissions and add the following permissions:
+ - Microsoft Graph
+ - Delegated permissions
+ - OpenId permissions
+ - email
+ - profile
+ - openid
+ - User.Read
+ - Mail.Send (Necessary for sending emails from your account)
+
+##### Set Secrets using .NET [Secret Manager](https://learn.microsoft.com/en-us/aspnet/core/security/app-secrets)
+
+```powershell
+dotnet user-secrets set "AzureEntraId:TenantId" " ... your tenant id ... "
+dotnet user-secrets set "AzureEntraId:ClientId" " ... your client id ... "
+
+# App Registration Authentication
+dotnet user-secrets set "AzureEntraId:ClientSecret" " ... your client secret ... "
+# OR User Authentication (Interactive)
+dotnet user-secrets set "AzureEntraId:InteractiveBrowserAuthentication" "true"
+dotnet user-secrets set "AzureEntraId:RedirectUri" " ... your redirect uri ... "
+```
\ No newline at end of file
diff --git a/dotnet/samples/Demos/ProcessWithCloudEvents/appsettings.json b/dotnet/samples/Demos/ProcessWithCloudEvents/appsettings.json
new file mode 100644
index 000000000000..b8a90ee3e4dc
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithCloudEvents/appsettings.json
@@ -0,0 +1,23 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.AspNetCore": "Warning"
+ }
+ },
+ "AllowedHosts": "*",
+ "ConnectionStrings": {
+ "MicrosoftGraph.TenantId": "",
+ "MicrosoftGraph.ClientId": ""
+ },
+ "AzureEntraId": {
+ "Instance": "https://login.microsoftonline.com/",
+ "TenantId": "",
+ "ClientId": "",
+ "ClientSecret": "",
+ "InteractiveBrowserAuthentication": true
+ },
+ "MicrosoftGraph": {
+ "BaseUrl": "https://graph.microsoft.com/v1.0"
+ }
+}
diff --git a/dotnet/samples/Demos/ProcessWithDapr/Controllers/ProcessController.cs b/dotnet/samples/Demos/ProcessWithDapr/Controllers/ProcessController.cs
index efbd990cb692..bbfde635f116 100644
--- a/dotnet/samples/Demos/ProcessWithDapr/Controllers/ProcessController.cs
+++ b/dotnet/samples/Demos/ProcessWithDapr/Controllers/ProcessController.cs
@@ -3,6 +3,7 @@
using System.Runtime.Serialization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.SemanticKernel;
+using Microsoft.SemanticKernel.Process;
namespace ProcessWithDapr.Controllers;
@@ -38,10 +39,20 @@ public async Task PostAsync(string processId)
return this.Ok(processId);
}
+ private enum ProcessEvents
+ {
+ StartProcess,
+ OnKickOffEvent,
+ OnStepACompleted,
+ OnStepBCompleted,
+ SharedEvent,
+ OnExitProcess,
+ }
+
private KernelProcess GetProcess()
{
// Create the process builder.
- ProcessBuilder processBuilder = new("ProcessWithDapr");
+ var processBuilder = new ProcessBuilder("ProcessWithDapr");
// Add some steps to the process.
var kickoffStep = processBuilder.AddStepFromType();
@@ -62,29 +73,38 @@ private KernelProcess GetProcess()
// When the kickoff step is finished, trigger both AStep and BStep.
kickoffStep
.OnEvent(CommonEvents.StartARequested)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessEvents.OnKickOffEvent))
.SendEventTo(new ProcessFunctionTargetBuilder(myAStep))
.SendEventTo(new ProcessFunctionTargetBuilder(myBStep));
// When AStep finishes, send its output to CStep.
myAStep
.OnEvent(CommonEvents.AStepDone)
+ //.EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessEvents.OnStepACompleted))
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessEvents.SharedEvent))
.SendEventTo(new ProcessFunctionTargetBuilder(myCStep, parameterName: "astepdata"));
// When BStep finishes, send its output to CStep also.
myBStep
.OnEvent(CommonEvents.BStepDone)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessEvents.OnStepBCompleted))
.SendEventTo(new ProcessFunctionTargetBuilder(myCStep, parameterName: "bstepdata"));
// When CStep has finished without requesting an exit, activate the Kickoff step to start again.
myCStep
.OnEvent(CommonEvents.CStepDone)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessEvents.SharedEvent))
.SendEventTo(new ProcessFunctionTargetBuilder(kickoffStep));
// When the CStep has finished by requesting an exit, stop the process.
myCStep
.OnEvent(CommonEvents.ExitRequested)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessEvents.OnExitProcess))
.StopProcess();
+ // event subscribers
+ processBuilder.LinkEventSubscribersFromType();
+
var process = processBuilder.Build();
return process;
}
@@ -186,6 +206,29 @@ private sealed record CStepState
public int CurrentCycle { get; set; }
}
+ private class CloudEventsSubscribers : KernelProcessEventsSubscriber
+ {
+ [ProcessEventSubscriber(ProcessEvents.OnKickOffEvent, "pubsub", "someotherkickofftopicname")]
+ public void OnKickOff()
+ {
+ }
+
+ //[ProcessEventSubscriber(ProcessEvents.OnStepACompleted, "mypubsub")]
+ //public void OnStepACompleted()
+ //{
+ //}
+
+ [ProcessEventSubscriber(ProcessEvents.OnStepBCompleted, "pubsub")]
+ public void OnStepBCompleted()
+ {
+ }
+
+ [ProcessEventSubscriber(ProcessEvents.SharedEvent, "pubsub", "sharedeventtopic")]
+ public void OnSharedEvent()
+ {
+ }
+ }
+
///
/// Common Events used in the process.
///
diff --git a/dotnet/samples/Demos/ProcessWithDapr/ProcessWithDapr.csproj b/dotnet/samples/Demos/ProcessWithDapr/ProcessWithDapr.csproj
index 69628bbacda5..67aeefb88de5 100644
--- a/dotnet/samples/Demos/ProcessWithDapr/ProcessWithDapr.csproj
+++ b/dotnet/samples/Demos/ProcessWithDapr/ProcessWithDapr.csproj
@@ -17,6 +17,7 @@
+
\ No newline at end of file
diff --git a/dotnet/samples/Demos/ProcessWithDapr/ProcessWithDapr.sln b/dotnet/samples/Demos/ProcessWithDapr/ProcessWithDapr.sln
new file mode 100644
index 000000000000..1b819543cbab
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithDapr/ProcessWithDapr.sln
@@ -0,0 +1,25 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 17
+VisualStudioVersion = 17.5.002.0
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ProcessWithDapr", "ProcessWithDapr.csproj", "{1A0F22AF-CA42-4D8E-BE74-83051944F9C2}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {1A0F22AF-CA42-4D8E-BE74-83051944F9C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {1A0F22AF-CA42-4D8E-BE74-83051944F9C2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {1A0F22AF-CA42-4D8E-BE74-83051944F9C2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {1A0F22AF-CA42-4D8E-BE74-83051944F9C2}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {368AA022-DD05-4F21-BA1D-990B657D8560}
+ EndGlobalSection
+EndGlobal
diff --git a/dotnet/samples/Demos/ProcessWithDapr/Program.cs b/dotnet/samples/Demos/ProcessWithDapr/Program.cs
index 4b2c0bdd2daf..c4c3c940cd52 100644
--- a/dotnet/samples/Demos/ProcessWithDapr/Program.cs
+++ b/dotnet/samples/Demos/ProcessWithDapr/Program.cs
@@ -14,12 +14,15 @@
// Configure the Kernel with DI. This is required for dependency injection to work with processes.
builder.Services.AddKernel();
-// Configure Dapr
+// Configure Dapr Actors
builder.Services.AddActors(static options =>
{
// Register the actors required to run Processes
options.AddProcessActors();
});
+// Configure Dapr Pubsub Client
+//builder.Services.AddControllers().AddDapr();
+builder.Services.AddDaprClient();
builder.Services.AddControllers();
var app = builder.Build();
diff --git a/dotnet/samples/Demos/ProcessWithDapr/README.md b/dotnet/samples/Demos/ProcessWithDapr/README.md
index 9718d5c6f1f1..81755e645131 100644
--- a/dotnet/samples/Demos/ProcessWithDapr/README.md
+++ b/dotnet/samples/Demos/ProcessWithDapr/README.md
@@ -12,12 +12,21 @@ For more information about Semantic Kernel Processes and Dapr, see the following
#### Dapr
- [Dapr documentation](https://docs.dapr.io/)
-- [Dapr Actor documentation](https://v1-10.docs.dapr.io/developing-applications/building-blocks/actors/)
+- [Dapr Actor documentation](https://v1-14.docs.dapr.io/developing-applications/building-blocks/actors/)
- [Dapr local development](https://docs.dapr.io/getting-started/install-dapr-selfhost/)
## Running the Demo
-Before running this Demo, make sure to configure Dapr for local development following the links above. The Dapr containers must be running for this demo application to run.
+### Setup
+Before running this Demo, make sure to configure Dapr for local development following the links above. Then:
+
+1. Make sure `Docker` is running
+2. Initialize Dapr by running `dapr init`
+3. Make sure the Dapr containers are running:
+ - dap_redis
+ - dapr_placement
+ - dapr_scheduler
+ - dapr_zipkin (optional)
```mermaid
flowchart LR
@@ -35,8 +44,10 @@ flowchart LR
End((End))
```
+### Demo
+
1. Build and run the sample. Running the Dapr service locally can be done using the Dapr Cli or with the Dapr VS Code extension. The VS Code extension is the recommended approach if you want to debug the code as it runs.
-1. When the service is up and running, it will expose a single API in localhost port 5000.
+2. When the service is up and running, it will expose a single API in localhost port 5000.
#### Invoking the process:
diff --git a/dotnet/samples/Demos/ProcessWithDapr/components/pubsub.yaml b/dotnet/samples/Demos/ProcessWithDapr/components/pubsub.yaml
new file mode 100644
index 000000000000..18764d8ce0bb
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithDapr/components/pubsub.yaml
@@ -0,0 +1,12 @@
+apiVersion: dapr.io/v1alpha1
+kind: Component
+metadata:
+ name: pubsub
+spec:
+ type: pubsub.redis
+ version: v1
+ metadata:
+ - name: redisHost
+ value: localhost:6379
+ - name: redisPassword
+ value: ""
diff --git a/dotnet/samples/Demos/ProcessWithDapr/components/statestore.yaml b/dotnet/samples/Demos/ProcessWithDapr/components/statestore.yaml
new file mode 100644
index 000000000000..2f676bff8025
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithDapr/components/statestore.yaml
@@ -0,0 +1,14 @@
+apiVersion: dapr.io/v1alpha1
+kind: Component
+metadata:
+ name: statestore
+spec:
+ type: state.redis
+ version: v1
+ metadata:
+ - name: redisHost
+ value: localhost:6379
+ - name: redisPassword
+ value: ""
+ - name: actorStateStore
+ value: "true"
diff --git a/dotnet/samples/Demos/ProcessWithDapr/processWithDapr.http b/dotnet/samples/Demos/ProcessWithDapr/processWithDapr.http
new file mode 100644
index 000000000000..744be4d9c431
--- /dev/null
+++ b/dotnet/samples/Demos/ProcessWithDapr/processWithDapr.http
@@ -0,0 +1,5 @@
+# For more info on HTTP files go to https://aka.ms/vs/httpfile
+@ProcessWithDapr_HostAddress = http://localhost:58641
+
+GET {{ProcessWithDapr_HostAddress}}/processes/6
+Accept: application/json
\ No newline at end of file
diff --git a/dotnet/samples/GettingStartedWithProcesses/README.md b/dotnet/samples/GettingStartedWithProcesses/README.md
index 107a97afc319..7b38d693ee7d 100644
--- a/dotnet/samples/GettingStartedWithProcesses/README.md
+++ b/dotnet/samples/GettingStartedWithProcesses/README.md
@@ -25,6 +25,7 @@ Example|Description
[Step01_Processes](https://github.com/microsoft/semantic-kernel/blob/main/dotnet/samples/GettingStartedWithProcesses/Step01/Step01_Processes.cs)|How to create a simple process with a loop and a conditional exit
[Step02a_AccountOpening](https://github.com/microsoft/semantic-kernel/blob/main/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs)|Showcasing processes cycles, fan in, fan out for opening an account.
[Step02b_AccountOpening](https://github.com/microsoft/semantic-kernel/blob/main/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs)|How to refactor processes and make use of smaller processes as steps in larger processes.
+[Step02c_AccountOpening](https://github.com/microsoft/semantic-kernel/blob/main/dotnet/samples/GettingStartedWithProcesses/Step02/Step02c_AccountOpening.cs)|How to refactor processes and make use of SK Event Subscribers.
[Step03a_FoodPreparation](https://github.com/microsoft/semantic-kernel/blob/main/dotnet/samples/GettingStartedWithProcesses/Step03/Step03a_FoodPreparation.cs)|Showcasing reuse of steps, creation of processes, spawning of multiple events, use of stateful steps with food preparation samples.
[Step03b_FoodOrdering](https://github.com/microsoft/semantic-kernel/blob/main/dotnet/samples/GettingStartedWithProcesses/Step03/Step03b_FoodOrdering.cs)|Showcasing use of subprocesses as steps, spawning of multiple events conditionally reusing the food preparation samples.
[Step04_AgentOrchestration](https://github.com/microsoft/semantic-kernel/blob/main/dotnet/samples/GettingStartedWithProcesses/Step04/Step04_AgentOrchestration.cs)|Showcasing use of process steps in conjunction with the _Agent Framework_.
@@ -50,7 +51,7 @@ flowchart LR
### Step02_AccountOpening
-The account opening sample has 2 different implementations covering the same scenario, it just uses different SK components to achieve the same goal.
+The account opening sample has 3 different implementations covering the same scenario, it just uses different SK components to achieve the same goal.
In addition, the sample introduces the concept of using smaller process as steps to maintain the main process readable and manageble for future improvements and unit testing.
Also introduces the use of SK Event Subscribers.
@@ -176,6 +177,47 @@ graph LR
```
+#### Step02c_AccountOpeningWithCloudEvents
+
+An additional optimization that could be made to the Account Creation sample, is to make use of SK Event subscriber to isolate logic that has to do with cloud events.
+In this sample, the cloud event logic is mocked by the Mail Service functionality, which mocks sending an email to the user in different circumstances:
+
+- When new user credit score check fails
+- When new user fraud detection fails
+- When a new account was created successfully after passing all checks and creation steps
+
+When using SK Event subscribers, specific process events when triggered will emit the event data externally to
+any subscribers linked to specific events.
+
+```mermaid
+graph LR
+ subgraph EventSubscribers[SK Event Subscribers]
+ OnSendMailDueCreditCheckFailure[OnSendMailDueCredit
CheckFailure]
+ OnSendMailDueFraudCheckFailure[OnSendMailDueFraud
CheckFailure]
+ OnSendMailWithNewAccountInfo[OnSendMailWith
NewAccountInfo]
+ end
+
+ subgraph Process[SK Process]
+ direction LR
+ User(User)
+ FillForm(Chat With User
to Fill New
Customer Form)
+ NewAccountVerification[[New Account Verification
Process]]
+ NewAccountCreation[[New Account Creation
Process]]
+
+ User<-->|Provides user details|FillForm
+ FillForm-->|New User Form|NewAccountVerification-->|Account Verification
Succeeded|NewAccountCreation
+ end
+
+ NewAccountVerification-->|Account Credit Check
Failed|OnSendMailDueCreditCheckFailure
+ NewAccountVerification-->|Account Fraud Detection
Failed|OnSendMailDueFraudCheckFailure
+ NewAccountCreation-->|Account Creation
Succeeded|OnSendMailWithNewAccountInfo
+
+```
+Creating a separation with SK Process when using cloud events (even though in this sample it's a mock of a Mailer), it is useful since
+it can help to isolate additional logic related to authentication, use of additional frameworks, etc.
+
+For a more realistic sample of SK Process emitting real cloud events check out the [`ProcessWithCloudEvents` Demo](../Demos/ProcessWithCloudEvents/README.md).
+
### Step03a_FoodPreparation
This tutorial contains a set of food recipes associated with the Food Preparation Processes of a restaurant.
diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/EventSubscribers/AccountOpeningEventSubscribers.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/EventSubscribers/AccountOpeningEventSubscribers.cs
new file mode 100644
index 000000000000..7961f84cdf1a
--- /dev/null
+++ b/dotnet/samples/GettingStartedWithProcesses/Step02/EventSubscribers/AccountOpeningEventSubscribers.cs
@@ -0,0 +1,41 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Microsoft.SemanticKernel.Process;
+using Step02.Models;
+
+namespace Step02.EventSubscribers;
+///
+/// The SK Process Event Subscribers can link to specific Process Events triggered
+/// by making use of the
+///
+public class AccountOpeningEventSubscribers : KernelProcessEventsSubscriber
+{
+ [ProcessEventSubscriber(AccountOpeningEvents.NewAccountOpeningEvents.OnNewUserCreditCheckFailed)]
+ public Task OnSendMailDueCreditCheckFailure(string message)
+ {
+ this.MockSendEmail("New Account Failure [CREDIT]", message);
+ return Task.CompletedTask;
+ }
+
+ [ProcessEventSubscriber(AccountOpeningEvents.NewAccountOpeningEvents.OnNewUserFraudCheckFailed)]
+ public Task OnSendMailDueFraudCheckFailure(string message)
+ {
+ this.MockSendEmail("New Account Failure [FRAUD]", message);
+ return Task.CompletedTask;
+ }
+
+ [ProcessEventSubscriber(AccountOpeningEvents.NewAccountOpeningEvents.AccountCreatedSuccessfully)]
+ public Task OnSendMailWithNewAccountInfo(string message)
+ {
+ this.MockSendEmail("Welcome! You have a new account", message);
+ return Task.CompletedTask;
+ }
+
+ private void MockSendEmail(string subject, string message)
+ {
+ Console.WriteLine("======== MAIL SERVICE (via SK Event Subscribers) ========");
+ Console.WriteLine($"SUBJECT: {subject}");
+ Console.WriteLine(message);
+ Console.WriteLine("=========================================================");
+ }
+}
diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Models/AccountOpeningEvents.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Models/AccountOpeningEvents.cs
index eda9fc8d4ea3..f6ba4158a19b 100644
--- a/dotnet/samples/GettingStartedWithProcesses/Step02/Models/AccountOpeningEvents.cs
+++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Models/AccountOpeningEvents.cs
@@ -32,4 +32,19 @@ public static class AccountOpeningEvents
public static readonly string WelcomePacketCreated = nameof(WelcomePacketCreated);
public static readonly string MailServiceSent = nameof(MailServiceSent);
+
+ // For now, root process events should match internal subprocess event names to be able to use
+ // SK Event Subscribers
+ public enum NewAccountOpeningEvents
+ {
+ StartOpeningAccount,
+ OnNewUserFraudCheckFailed,
+ OnNewUserCreditCheckFailed,
+ AccountCreatedSuccessfully,
+ }
+
+ public static string GetEventName(NewAccountOpeningEvents processEvent)
+ {
+ return Enum.GetName(processEvent) ?? "";
+ }
}
diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Processes/NewAccountCreationProcess.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Processes/NewAccountCreationProcess.cs
index 7e96b9544d28..8c2a3a8c3f48 100644
--- a/dotnet/samples/GettingStartedWithProcesses/Step02/Processes/NewAccountCreationProcess.cs
+++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Processes/NewAccountCreationProcess.cs
@@ -14,9 +14,33 @@ namespace Step02.Processes;
///
public static class NewAccountCreationProcess
{
- public static ProcessBuilder CreateProcess()
+ ///
+ /// Process events allow to only expose the relevant events that have interactions with external components to this process as input or output events.
+ /// This way when using the process as a step in another process it can be seen as:
+ ///
+ /// PROCESS INPUTS PROCESS OUTPUTS
+ /// ┌─────────┐
+ /// OnNewCustomerFormCompleted ─────►│ │
+ /// │ │
+ /// OnCustomerTranscriptReady ──────►│ Process │───► AccountCreatedSuccessfully
+ /// │ │
+ /// OnNewAccountVerificationPassed ─►│ │
+ /// └─────────┘
+ ///
+ ///
+ public enum ProcessEvents
{
- ProcessBuilder process = new("AccountCreationProcess");
+ // Process Input Events
+ OnNewCustomerFormCompleted,
+ OnCustomerTranscriptReady,
+ OnNewAccountVerificationPassed,
+ // Process Output Events
+ AccountCreatedSuccessfully
+ }
+
+ public static ProcessBuilder CreateProcess()
+ {
+ var process = new ProcessBuilder("AccountCreationProcess");
var coreSystemRecordCreationStep = process.AddStepFromType();
var marketingRecordCreationStep = process.AddStepFromType();
@@ -25,18 +49,18 @@ public static ProcessBuilder CreateProcess()
// When the newCustomerForm is completed...
process
- .OnInputEvent(AccountOpeningEvents.NewCustomerFormCompleted)
+ .OnInputEvent(ProcessEvents.OnNewCustomerFormCompleted)
// The information gets passed to the core system record creation step
.SendEventTo(new ProcessFunctionTargetBuilder(coreSystemRecordCreationStep, functionName: NewAccountStep.Functions.CreateNewAccount, parameterName: "customerDetails"));
// When the newCustomerForm is completed, the user interaction transcript with the user is passed to the core system record creation step
process
- .OnInputEvent(AccountOpeningEvents.CustomerInteractionTranscriptReady)
+ .OnInputEvent(ProcessEvents.OnCustomerTranscriptReady)
.SendEventTo(new ProcessFunctionTargetBuilder(coreSystemRecordCreationStep, functionName: NewAccountStep.Functions.CreateNewAccount, parameterName: "interactionTranscript"));
// When the fraudDetectionCheck step passes, the information gets to core system record creation step to kickstart this step
process
- .OnInputEvent(AccountOpeningEvents.NewAccountVerificationCheckPassed)
+ .OnInputEvent(ProcessEvents.OnNewAccountVerificationPassed)
.SendEventTo(new ProcessFunctionTargetBuilder(coreSystemRecordCreationStep, functionName: NewAccountStep.Functions.CreateNewAccount, parameterName: "previousCheckSucceeded"));
// When the coreSystemRecordCreation step successfully creates a new accountId, it will trigger the creation of a new marketing entry through the marketingRecordCreation step
@@ -65,6 +89,10 @@ public static ProcessBuilder CreateProcess()
.OnEvent(AccountOpeningEvents.CRMRecordInfoEntryCreated)
.SendEventTo(new ProcessFunctionTargetBuilder(welcomePacketStep, parameterName: "crmRecordCreated"));
+ welcomePacketStep
+ .OnEvent(AccountOpeningEvents.WelcomePacketCreated)
+ .EmitAsProcessEvent(process.GetProcessEvent(ProcessEvents.AccountCreatedSuccessfully));
+
return process;
}
}
diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Processes/NewAccountVerificationProcess.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Processes/NewAccountVerificationProcess.cs
index e4184a71bd1e..cefae1d9630b 100644
--- a/dotnet/samples/GettingStartedWithProcesses/Step02/Processes/NewAccountVerificationProcess.cs
+++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Processes/NewAccountVerificationProcess.cs
@@ -14,16 +14,40 @@ namespace Step02.Processes;
///
public static class NewAccountVerificationProcess
{
- public static ProcessBuilder CreateProcess()
+ ///
+ /// Process events allow to only expose the relevant events that have interactions with external components to this process as input or output events.
+ /// This way when using the process as a step in another process it can be seen as:
+ ///
+ /// PROCESS INPUTS PROCESS OUTPUTS
+ /// ┌─────────┐
+ /// │ │───► OnNewUserFraudCheckFailed
+ /// │ │
+ /// OnNewCustomerFormCompleted ──►│ Process │───► OnNewAccountVerificationSucceeded
+ /// │ │
+ /// │ │───► OnNewUserCreditCheckFailed
+ /// └─────────┘
+ ///
+ ///
+ public enum ProcessEvents
{
- ProcessBuilder process = new("AccountVerificationProcess");
+ // Process Input Events
+ OnNewCustomerFormCompleted,
+ // Process Output Events
+ OnNewAccountVerificationSucceeded,
+ OnNewUserFraudCheckFailed,
+ OnNewUserCreditCheckFailed
+ }
+
+ public static ProcessBuilder CreateProcess()
+ {
+ var process = new ProcessBuilder("AccountVerificationProcess");
var customerCreditCheckStep = process.AddStepFromType();
var fraudDetectionCheckStep = process.AddStepFromType();
// When the newCustomerForm is completed...
process
- .OnInputEvent(AccountOpeningEvents.NewCustomerFormCompleted)
+ .OnInputEvent(ProcessEvents.OnNewCustomerFormCompleted)
// The information gets passed to the core system record creation step
.SendEventTo(new ProcessFunctionTargetBuilder(customerCreditCheckStep, functionName: CreditScoreCheckStep.Functions.DetermineCreditScore, parameterName: "customerDetails"))
// The information gets passed to the fraud detection step for validation
@@ -34,6 +58,18 @@ public static ProcessBuilder CreateProcess()
.OnEvent(AccountOpeningEvents.CreditScoreCheckApproved)
.SendEventTo(new ProcessFunctionTargetBuilder(fraudDetectionCheckStep, functionName: FraudDetectionStep.Functions.FraudDetectionCheck, parameterName: "previousCheckSucceeded"));
+ customerCreditCheckStep
+ .OnEvent(AccountOpeningEvents.CreditScoreCheckRejected)
+ .EmitAsProcessEvent(process.GetProcessEvent(ProcessEvents.OnNewUserCreditCheckFailed));
+
+ fraudDetectionCheckStep
+ .OnEvent(AccountOpeningEvents.FraudDetectionCheckPassed)
+ .EmitAsProcessEvent(process.GetProcessEvent(ProcessEvents.OnNewAccountVerificationSucceeded));
+
+ fraudDetectionCheckStep
+ .OnEvent(AccountOpeningEvents.FraudDetectionCheckFailed)
+ .EmitAsProcessEvent(process.GetProcessEvent(ProcessEvents.OnNewUserFraudCheckFailed));
+
return process;
}
}
diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs
index 1564dc679eec..d0ea986227ab 100644
--- a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs
+++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs
@@ -5,6 +5,7 @@
using SharedSteps;
using Step02.Models;
using Step02.Steps;
+using Step02.Utils;
namespace Step02;
@@ -143,9 +144,14 @@ private KernelProcess SetupAccountOpeningProcess() where TUserIn
[Fact]
public async Task UseAccountOpeningProcessSuccessfulInteractionAsync()
{
+ // Arrange
Kernel kernel = CreateKernelWithChatCompletion();
KernelProcess kernelProcess = SetupAccountOpeningProcess();
- using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Act
+ var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Assert
+ var processInfo = await runningProcess.GetStateAsync();
+ AccountOpeningAsserts.AssertAccountOpeningSuccessMailMessage(processInfo, nameof(MailServiceStep));
}
///
@@ -154,9 +160,14 @@ public async Task UseAccountOpeningProcessSuccessfulInteractionAsync()
[Fact]
public async Task UseAccountOpeningProcessFailureDueToCreditScoreFailureAsync()
{
+ // Arrange
Kernel kernel = CreateKernelWithChatCompletion();
KernelProcess kernelProcess = SetupAccountOpeningProcess();
- using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Act
+ var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Assert
+ var processInfo = await runningProcess.GetStateAsync();
+ AccountOpeningAsserts.AssertAccountOpeningFailDueCreditScoreMailMessage(processInfo, nameof(MailServiceStep));
}
///
@@ -165,8 +176,13 @@ public async Task UseAccountOpeningProcessFailureDueToCreditScoreFailureAsync()
[Fact]
public async Task UseAccountOpeningProcessFailureDueToFraudFailureAsync()
{
+ // Arrange
Kernel kernel = CreateKernelWithChatCompletion();
KernelProcess kernelProcess = SetupAccountOpeningProcess();
- using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Act
+ var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Assert
+ var processInfo = await runningProcess.GetStateAsync();
+ AccountOpeningAsserts.AssertAccountOpeningFailDueFraudMailMessage(processInfo, nameof(MailServiceStep));
}
}
diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs
index b14b659cd20f..10adc8caffe4 100644
--- a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs
+++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs
@@ -6,6 +6,7 @@
using Step02.Models;
using Step02.Processes;
using Step02.Steps;
+using Step02.Utils;
namespace Step02;
@@ -13,7 +14,7 @@ namespace Step02;
/// Demonstrate creation of and
/// eliciting its response to five explicit user messages.
/// For each test there is a different set of user messages that will cause different steps to be triggered using the same pipeline.
-/// For visual reference of the process check the diagram.
+/// For visual reference of the process check the diagram.
///
public class Step02b_AccountOpening(ITestOutputHelper output) : BaseTest(output, redirectSystemConsoleOutput: true)
{
@@ -27,8 +28,8 @@ private KernelProcess SetupAccountOpeningProcess() where TUserIn
var userInputStep = process.AddStepFromType();
var displayAssistantMessageStep = process.AddStepFromType();
- var accountVerificationStep = process.AddStepFromProcess(NewAccountVerificationProcess.CreateProcess());
- var accountCreationStep = process.AddStepFromProcess(NewAccountCreationProcess.CreateProcess());
+ var accountVerificationStep = (ProcessBuilder)process.AddStepFromProcess(NewAccountVerificationProcess.CreateProcess());
+ var accountCreationStep = (ProcessBuilder)process.AddStepFromProcess(NewAccountCreationProcess.CreateProcess());
var mailServiceStep = process.AddStepFromType();
@@ -65,33 +66,36 @@ private KernelProcess SetupAccountOpeningProcess() where TUserIn
newCustomerFormStep
.OnEvent(AccountOpeningEvents.NewCustomerFormCompleted)
// The information gets passed to the account verificatino step
- .SendEventTo(accountVerificationStep.WhereInputEventIs(AccountOpeningEvents.NewCustomerFormCompleted))
+ .SendEventTo(accountVerificationStep.WhereInputEventIs(NewAccountVerificationProcess.ProcessEvents.OnNewCustomerFormCompleted))
// The information gets passed to the validation process step
- .SendEventTo(accountCreationStep.WhereInputEventIs(AccountOpeningEvents.NewCustomerFormCompleted));
+ .SendEventTo(accountCreationStep.WhereInputEventIs(NewAccountCreationProcess.ProcessEvents.OnNewCustomerFormCompleted));
// When the newCustomerForm is completed, the user interaction transcript with the user is passed to the core system record creation step
newCustomerFormStep
.OnEvent(AccountOpeningEvents.CustomerInteractionTranscriptReady)
- .SendEventTo(accountCreationStep.WhereInputEventIs(AccountOpeningEvents.CustomerInteractionTranscriptReady));
+ .SendEventTo(accountCreationStep.WhereInputEventIs(NewAccountCreationProcess.ProcessEvents.OnCustomerTranscriptReady));
// When the creditScoreCheck step results in Rejection, the information gets to the mailService step to notify the user about the state of the application and the reasons
accountVerificationStep
- .OnEvent(AccountOpeningEvents.CreditScoreCheckRejected)
+ // .OnEvent(AccountOpeningEvents.CreditScoreCheckRejected) // if using OnEvent the event name must match exactly the name emitted by the inner step
+ .OnProcessEvent(NewAccountVerificationProcess.ProcessEvents.OnNewUserCreditCheckFailed)
.SendEventTo(new ProcessFunctionTargetBuilder(mailServiceStep));
// When the fraudDetectionCheck step fails, the information gets to the mailService step to notify the user about the state of the application and the reasons
accountVerificationStep
- .OnEvent(AccountOpeningEvents.FraudDetectionCheckFailed)
+ // .OnEvent(AccountOpeningEvents.FraudDetectionCheckFailed) // if using OnEvent the event name must match exactly the name emitted by the inner step
+ .OnProcessEvent(NewAccountVerificationProcess.ProcessEvents.OnNewUserFraudCheckFailed)
.SendEventTo(new ProcessFunctionTargetBuilder(mailServiceStep));
// When the fraudDetectionCheck step passes, the information gets to core system record creation step to kickstart this step
accountVerificationStep
- .OnEvent(AccountOpeningEvents.FraudDetectionCheckPassed)
- .SendEventTo(accountCreationStep.WhereInputEventIs(AccountOpeningEvents.NewAccountVerificationCheckPassed));
+ // .OnEvent(AccountOpeningEvents.FraudDetectionCheckPassed) // if using OnEvent the event name must match exactly the name emitted by the inner step
+ .OnProcessEvent(NewAccountVerificationProcess.ProcessEvents.OnNewAccountVerificationSucceeded)
+ .SendEventTo(accountCreationStep.WhereInputEventIs(NewAccountCreationProcess.ProcessEvents.OnNewAccountVerificationPassed));
// After crmRecord and marketing gets created, a welcome packet is created to then send information to the user with the mailService step
accountCreationStep
- .OnEvent(AccountOpeningEvents.WelcomePacketCreated)
+ .OnProcessEvent(NewAccountCreationProcess.ProcessEvents.AccountCreatedSuccessfully)
.SendEventTo(new ProcessFunctionTargetBuilder(mailServiceStep));
// All possible paths end up with the user being notified about the account creation decision throw the mailServiceStep completion
@@ -110,9 +114,14 @@ private KernelProcess SetupAccountOpeningProcess() where TUserIn
[Fact]
public async Task UseAccountOpeningProcessSuccessfulInteractionAsync()
{
+ // Arrange
Kernel kernel = CreateKernelWithChatCompletion();
KernelProcess kernelProcess = SetupAccountOpeningProcess();
- using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Act
+ var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Assert
+ var processInfo = await runningProcess.GetStateAsync();
+ AccountOpeningAsserts.AssertAccountOpeningSuccessMailMessage(processInfo, nameof(MailServiceStep));
}
///
@@ -121,9 +130,14 @@ public async Task UseAccountOpeningProcessSuccessfulInteractionAsync()
[Fact]
public async Task UseAccountOpeningProcessFailureDueToCreditScoreFailureAsync()
{
+ // Arrange
Kernel kernel = CreateKernelWithChatCompletion();
KernelProcess kernelProcess = SetupAccountOpeningProcess();
- using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Act
+ var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Assert
+ var processInfo = await runningProcess.GetStateAsync();
+ AccountOpeningAsserts.AssertAccountOpeningFailDueCreditScoreMailMessage(processInfo, nameof(MailServiceStep));
}
///
@@ -132,8 +146,13 @@ public async Task UseAccountOpeningProcessFailureDueToCreditScoreFailureAsync()
[Fact]
public async Task UseAccountOpeningProcessFailureDueToFraudFailureAsync()
{
+ // Arrange
Kernel kernel = CreateKernelWithChatCompletion();
KernelProcess kernelProcess = SetupAccountOpeningProcess();
- using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Act
+ var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null });
+ // Assert
+ var processInfo = await runningProcess.GetStateAsync();
+ AccountOpeningAsserts.AssertAccountOpeningFailDueFraudMailMessage(processInfo, nameof(MailServiceStep));
}
}
diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02c_AccountOpening.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02c_AccountOpening.cs
new file mode 100644
index 000000000000..516cbf5f7516
--- /dev/null
+++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02c_AccountOpening.cs
@@ -0,0 +1,148 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Events;
+using Microsoft.SemanticKernel;
+using SharedSteps;
+using Step02.EventSubscribers;
+using Step02.Models;
+using Step02.Processes;
+using Step02.Steps;
+using static Step02.Models.AccountOpeningEvents;
+
+namespace Step02;
+
+///
+/// Demonstrate creation of and
+/// eliciting its response to five explicit user messages.
+/// For each test there is a different set of user messages that will cause different steps to be triggered using the same pipeline.
+/// For visual reference of the process check the diagram.
+///
+public class Step02c_AccountOpening(ITestOutputHelper output) : BaseTest(output, redirectSystemConsoleOutput: true)
+{
+ // Target Open AI Services
+ protected override bool ForceOpenAI => true;
+
+ private KernelProcess SetupAccountOpeningProcess() where TUserInputStep : ScriptedUserInputStep
+ {
+ var process = new ProcessBuilder("AccountOpeningProcessWithSubprocessesAndEventSubscribers");
+ var newCustomerFormStep = process.AddStepFromType();
+ var userInputStep = process.AddStepFromType();
+ var displayAssistantMessageStep = process.AddStepFromType();
+
+ var accountVerificationStep = (ProcessBuilder)process.AddStepFromProcess(NewAccountVerificationProcess.CreateProcess());
+ var accountCreationStep = (ProcessBuilder)process.AddStepFromProcess(NewAccountCreationProcess.CreateProcess());
+
+ process
+ .OnInputEvent(NewAccountOpeningEvents.StartOpeningAccount)
+ .SendEventTo(new ProcessFunctionTargetBuilder(newCustomerFormStep, CompleteNewCustomerFormStep.Functions.NewAccountWelcome));
+
+ // When the welcome message is generated, send message to displayAssistantMessageStep
+ newCustomerFormStep
+ .OnEvent(AccountOpeningEvents.NewCustomerFormWelcomeMessageComplete)
+ .SendEventTo(new ProcessFunctionTargetBuilder(displayAssistantMessageStep, DisplayAssistantMessageStep.Functions.DisplayAssistantMessage));
+
+ // When the userInput step emits a user input event, send it to the newCustomerForm step
+ // Function names are necessary when the step has multiple public functions like CompleteNewCustomerFormStep: NewAccountWelcome and NewAccountProcessUserInfo
+ userInputStep
+ .OnEvent(CommonEvents.UserInputReceived)
+ .SendEventTo(new ProcessFunctionTargetBuilder(newCustomerFormStep, CompleteNewCustomerFormStep.Functions.NewAccountProcessUserInfo, "userMessage"));
+
+ userInputStep
+ .OnEvent(CommonEvents.Exit)
+ .StopProcess();
+
+ // When the newCustomerForm step emits needs more details, send message to displayAssistantMessage step
+ newCustomerFormStep
+ .OnEvent(AccountOpeningEvents.NewCustomerFormNeedsMoreDetails)
+ .SendEventTo(new ProcessFunctionTargetBuilder(displayAssistantMessageStep, DisplayAssistantMessageStep.Functions.DisplayAssistantMessage));
+
+ // After any assistant message is displayed, user input is expected to the next step is the userInputStep
+ displayAssistantMessageStep
+ .OnEvent(CommonEvents.AssistantResponseGenerated)
+ .SendEventTo(new ProcessFunctionTargetBuilder(userInputStep, ScriptedUserInputStep.Functions.GetUserInput));
+
+ // When the newCustomerForm is completed...
+ newCustomerFormStep
+ .OnEvent(AccountOpeningEvents.NewCustomerFormCompleted)
+ // The information gets passed to the account verificatino step
+ .SendEventTo(accountVerificationStep.WhereInputEventIs(NewAccountVerificationProcess.ProcessEvents.OnNewCustomerFormCompleted))
+ // The information gets passed to the validation process step
+ .SendEventTo(accountCreationStep.WhereInputEventIs(NewAccountCreationProcess.ProcessEvents.OnNewCustomerFormCompleted));
+
+ // When the newCustomerForm is completed, the user interaction transcript with the user is passed to the core system record creation step
+ newCustomerFormStep
+ .OnEvent(AccountOpeningEvents.CustomerInteractionTranscriptReady)
+ .SendEventTo(accountCreationStep.WhereInputEventIs(NewAccountCreationProcess.ProcessEvents.OnCustomerTranscriptReady));
+
+ // When the creditScoreCheck step results in Rejection, the information gets to the mailService step to notify the user about the state of the application and the reasons
+ accountVerificationStep
+ .OnProcessEvent(NewAccountVerificationProcess.ProcessEvents.OnNewUserCreditCheckFailed)
+ .EmitAsProcessEvent(process.GetProcessEvent(NewAccountOpeningEvents.OnNewUserCreditCheckFailed))
+ .StopProcess();
+
+ // When the fraudDetectionCheck step fails, the information gets to the mailService step to notify the user about the state of the application and the reasons
+ // Sample of bubbling up nested ProcessEvents
+ accountVerificationStep
+ .OnProcessEvent(NewAccountVerificationProcess.ProcessEvents.OnNewUserFraudCheckFailed)
+ .EmitAsProcessEvent(process.GetProcessEvent(NewAccountOpeningEvents.OnNewUserFraudCheckFailed))
+ .StopProcess();
+
+ // When the fraudDetectionCheck step passes, the information gets to core system record creation step to kickstart this step
+ accountVerificationStep
+ .OnProcessEvent(NewAccountVerificationProcess.ProcessEvents.OnNewAccountVerificationSucceeded)
+ .SendEventTo(accountCreationStep.WhereInputEventIs(NewAccountCreationProcess.ProcessEvents.OnNewAccountVerificationPassed));
+
+ // After crmRecord and marketing gets created, a welcome packet is created to then send information to the user with the mailService step
+ // Sample of bubbling up nested ProcessEvents
+ accountCreationStep
+ .OnProcessEvent(NewAccountCreationProcess.ProcessEvents.AccountCreatedSuccessfully)
+ .EmitAsProcessEvent(process.GetProcessEvent(NewAccountOpeningEvents.AccountCreatedSuccessfully))
+ .StopProcess();
+
+ // Linking NewAccountOpeningEmailEvents subscriber to process
+ process.LinkEventSubscribersFromType();
+
+ KernelProcess kernelProcess = process.Build();
+
+ return kernelProcess;
+ }
+
+ ///
+ /// This test uses a specific userId and DOB that makes the creditScore and Fraud detection to pass
+ ///
+ [Fact]
+ public async Task UseAccountOpeningProcessSuccessfulInteractionAsync()
+ {
+ // Arrange
+ Kernel kernel = CreateKernelWithChatCompletion();
+ KernelProcess kernelProcess = SetupAccountOpeningProcess();
+ // Act
+ using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.GetEventName(NewAccountOpeningEvents.StartOpeningAccount), Data = null });
+ }
+
+ ///
+ /// This test uses a specific DOB that makes the creditScore to fail
+ ///
+ [Fact]
+ public async Task UseAccountOpeningProcessFailureDueToCreditScoreFailureAsync()
+ {
+ // Arrange
+ Kernel kernel = CreateKernelWithChatCompletion();
+ KernelProcess kernelProcess = SetupAccountOpeningProcess();
+ // Act
+ using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.GetEventName(NewAccountOpeningEvents.StartOpeningAccount), Data = null });
+ }
+
+ ///
+ /// This test uses a specific userId that makes the fraudDetection to fail
+ ///
+ [Fact]
+ public async Task UseAccountOpeningProcessFailureDueToFraudFailureAsync()
+ {
+ // Arrange
+ Kernel kernel = CreateKernelWithChatCompletion();
+ KernelProcess kernelProcess = SetupAccountOpeningProcess();
+ // Act
+ using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.GetEventName(NewAccountOpeningEvents.StartOpeningAccount), Data = null });
+ }
+}
diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Steps/MailServiceStep.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Steps/MailServiceStep.cs
index b11f782cb201..63936f36feb1 100644
--- a/dotnet/samples/GettingStartedWithProcesses/Step02/Steps/MailServiceStep.cs
+++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Steps/MailServiceStep.cs
@@ -8,16 +8,25 @@ namespace Step02.Steps;
///
/// Mock step that emulates Mail Service with a message for the user.
///
-public class MailServiceStep : KernelProcessStep
+public class MailServiceStep : KernelProcessStep
{
public static class Functions
{
public const string SendMailToUserWithDetails = nameof(SendMailToUserWithDetails);
}
+ private MailServiceState? _state;
+
+ public override ValueTask ActivateAsync(KernelProcessStepState state)
+ {
+ this._state = state.State;
+ return default;
+ }
+
[KernelFunction(Functions.SendMailToUserWithDetails)]
public async Task SendMailServiceAsync(KernelProcessStepContext context, string message)
{
+ this._state!.LastMessageSent = message;
Console.WriteLine("======== MAIL SERVICE ======== ");
Console.WriteLine(message);
Console.WriteLine("============================== ");
@@ -25,3 +34,8 @@ public async Task SendMailServiceAsync(KernelProcessStepContext context, string
await context.EmitEventAsync(new() { Id = AccountOpeningEvents.MailServiceSent, Data = message });
}
}
+
+public class MailServiceState
+{
+ public string? LastMessageSent = null;
+}
diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Steps/WelcomePacketStep.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Steps/WelcomePacketStep.cs
index 3f9349f5eeb3..720dbbc3596b 100644
--- a/dotnet/samples/GettingStartedWithProcesses/Step02/Steps/WelcomePacketStep.cs
+++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Steps/WelcomePacketStep.cs
@@ -42,7 +42,8 @@ await context.EmitEventAsync(new()
{
Id = AccountOpeningEvents.WelcomePacketCreated,
Data = mailMessage,
- Visibility = KernelProcessEventVisibility.Public,
+ // When using ProcessBuilder<> with EmitAsProcessEvent there is no need of marking the event as public
+ //Visibility = KernelProcessEventVisibility.Public,
});
}
}
diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Utils/AccountOpeningAsserts.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Utils/AccountOpeningAsserts.cs
new file mode 100644
index 000000000000..bb49b3065f13
--- /dev/null
+++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Utils/AccountOpeningAsserts.cs
@@ -0,0 +1,57 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using Microsoft.SemanticKernel;
+using Step02.Steps;
+
+namespace Step02.Utils;
+public static class AccountOpeningAsserts
+{
+ private const string ExpectedAccountSuccessMessage = """
+ Dear John Contoso
+ We are thrilled to inform you that you have successfully created a new PRIME ABC Account with us!
+
+ Account Details:
+ Account Number: 00000000-0000-0000-0000-000000000000
+ Account Type: PrimeABC
+
+ Please keep this confidential for security purposes.
+
+ Here is the contact information we have in file:
+
+ Email: john.contoso@contoso.com
+ Phone: (222)-222-1234
+
+ Thank you for opening an account with us!
+ """;
+
+ private const string ExpectedFailedAccountDueFraudDetectionMessage = """
+ We regret to inform you that we found some inconsistent details regarding the information you provided regarding the new account of the type PRIME ABC you applied.
+ """;
+ private const string ExpectedFailedAccountDueCreditCheckMessage = """
+ We regret to inform you that your credit score of 500 is insufficient to apply for an account of the type PRIME ABC
+ """;
+
+ private static void AssertMailerStepStateLastMessage(KernelProcess processInfo, string stepName, string? expectedLastMessage)
+ {
+ KernelProcessStepInfo? stepInfo = processInfo.Steps.FirstOrDefault(s => s.State.Name == stepName);
+ Assert.NotNull(stepInfo);
+ var outputStepResult = stepInfo.State as KernelProcessStepState;
+ Assert.NotNull(outputStepResult?.State);
+ Assert.Equal(expectedLastMessage, outputStepResult.State.LastMessageSent);
+ }
+
+ public static void AssertAccountOpeningSuccessMailMessage(KernelProcess processInfo, string stepName)
+ {
+ AssertMailerStepStateLastMessage(processInfo, stepName, ExpectedAccountSuccessMessage);
+ }
+
+ public static void AssertAccountOpeningFailDueFraudMailMessage(KernelProcess processInfo, string stepName)
+ {
+ AssertMailerStepStateLastMessage(processInfo, stepName, ExpectedFailedAccountDueFraudDetectionMessage);
+ }
+
+ public static void AssertAccountOpeningFailDueCreditScoreMailMessage(KernelProcess processInfo, string stepName)
+ {
+ AssertMailerStepStateLastMessage(processInfo, stepName, ExpectedFailedAccountDueCreditCheckMessage);
+ }
+}
diff --git a/dotnet/src/Experimental/Process.Abstractions/Interfaces/IDaprPubsubEventInfo.cs b/dotnet/src/Experimental/Process.Abstractions/Interfaces/IDaprPubsubEventInfo.cs
new file mode 100644
index 000000000000..54835649ef54
--- /dev/null
+++ b/dotnet/src/Experimental/Process.Abstractions/Interfaces/IDaprPubsubEventInfo.cs
@@ -0,0 +1,19 @@
+// Copyright (c) Microsoft. All rights reserved.
+namespace Microsoft.SemanticKernel.Process.Interfaces;
+public interface IDaprPubsubEventInfo
+{
+ ///
+ /// Gets the string of the event name that the function is linked to
+ ///
+ string EventName { get; }
+
+ ///
+ /// When using Dapr Runtime, pubsub name is required to know where to send the specific Dapr event
+ ///
+ string? DaprPubsub { get; }
+
+ ///
+ /// When using Dapr runtime, If daprTopic provided topic will be used instead of eventName, if not provided default will be eventName
+ ///
+ public string? DaprTopic { get; }
+}
diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcess.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcess.cs
index 3b72a9aff192..743851e7b14d 100644
--- a/dotnet/src/Experimental/Process.Abstractions/KernelProcess.cs
+++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcess.cs
@@ -9,13 +9,15 @@ namespace Microsoft.SemanticKernel;
///
/// A serializable representation of a Process.
///
-public sealed record KernelProcess : KernelProcessStepInfo
+public sealed record KernelProcess : KernelProcessStepInfo // TODO: Should be renamed to KernelProcessInfo to keep consistent names
{
///
/// The collection of Steps in the Process.
///
public IList Steps { get; }
+ public KernelProcessEventsSubscriberInfo? EventsSubscriber { get; set; } = null;
+
///
/// Captures Kernel Process State into after process has run
///
@@ -31,12 +33,14 @@ public KernelProcessStateMetadata ToProcessStateMetadata()
/// The process state.
/// The steps of the process.
/// The edges of the process.
- public KernelProcess(KernelProcessState state, IList steps, Dictionary>? edges = null)
+ /// TODO: may need to reorder params
+ public KernelProcess(KernelProcessState state, IList steps, Dictionary>? edges = null, KernelProcessEventsSubscriberInfo? eventsSubscriber = null)
: base(typeof(KernelProcess), state, edges ?? [])
{
Verify.NotNull(steps);
Verify.NotNullOrWhiteSpace(state.Name);
this.Steps = [.. steps];
+ this.EventsSubscriber = eventsSubscriber;
}
}
diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessEdge.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessEdge.cs
index 224d5b67bb56..71b86dd2b472 100644
--- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessEdge.cs
+++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessEdge.cs
@@ -17,6 +17,12 @@ public sealed class KernelProcessEdge
[DataMember]
public string SourceStepId { get; init; }
+ [DataMember]
+ public string SourceEventName { get; init; }
+
+ [DataMember]
+ public string SourceEventId { get; init; }
+
///
/// The collection of s that are the output of the source Step.
///
@@ -26,12 +32,16 @@ public sealed class KernelProcessEdge
///
/// Creates a new instance of the class.
///
- public KernelProcessEdge(string sourceStepId, KernelProcessFunctionTarget outputTarget)
+ public KernelProcessEdge(string sourceStepId, KernelProcessFunctionTarget outputTarget, string sourceEventName, string sourceEventId)
{
Verify.NotNullOrWhiteSpace(sourceStepId);
+ Verify.NotNullOrWhiteSpace(sourceEventId);
+ Verify.NotNullOrWhiteSpace(sourceEventName);
Verify.NotNull(outputTarget);
this.SourceStepId = sourceStepId;
+ this.SourceEventId = sourceEventId;
+ this.SourceEventName = sourceEventName;
this.OutputTarget = outputTarget;
}
}
diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessEventsSubscriber.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessEventsSubscriber.cs
new file mode 100644
index 000000000000..762879182584
--- /dev/null
+++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessEventsSubscriber.cs
@@ -0,0 +1,82 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using System;
+using Microsoft.SemanticKernel.Process.Interfaces;
+
+namespace Microsoft.SemanticKernel.Process;
+
+public class KernelProcessEventsSubscriber
+{
+ public IServiceProvider? ServiceProvider { get; init; }
+
+ protected KernelProcessEventsSubscriber() { }
+}
+
+///
+/// Attribute to set Process related steps to link Process Events to specific functions to execute when the event is emitted outside the Process
+///
+/// Enum that contains all process events that could be subscribed to
+public class KernelProcessEventsSubscriber : KernelProcessEventsSubscriber where TEvents : Enum
+{
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public KernelProcessEventsSubscriber() { }
+
+ ///
+ /// Attribute to set Process related steps to link Process Events to specific functions to execute when the event is emitted outside the Process
+ ///
+ [AttributeUsage(AttributeTargets.Method, AllowMultiple = false)]
+ public sealed class ProcessEventSubscriberAttribute : Attribute, IDaprPubsubEventInfo
+ {
+ private string GetEventName(TEvents eventEnum)
+ {
+ return Enum.GetName(typeof(TEvents), eventEnum) ?? "";
+ }
+
+ ///
+ /// Gets the enum of the event that the function is linked to
+ ///
+ public TEvents EventEnum { get; }
+
+ ///
+ /// Gets the string of the event name that the function is linked to
+ ///
+ public string EventName { get; }
+
+ #region Dapr Runtime related properties
+ ///
+ /// When using Dapr Runtime, pubsub name is required to know where to send the specific Dapr event
+ ///
+ public string? DaprPubsub { get; }
+
+ ///
+ /// When using Dapr runtime, If daprTopic provided topic will be used instead of eventName, if not provided default will be eventName
+ ///
+ public string? DaprTopic { get; }
+ #endregion
+
+ ///
+ /// Initializes the attribute.
+ ///
+ /// Specific Process Event enum
+ public ProcessEventSubscriberAttribute(TEvents eventEnum)
+ {
+ this.EventEnum = eventEnum;
+ this.EventName = this.GetEventName(eventEnum);
+ // No Dapr related properties specified
+ this.DaprPubsub = null;
+ this.DaprTopic = null;
+ }
+
+ public ProcessEventSubscriberAttribute(TEvents eventEnum, string daprPubSub, string? daprTopic = null)
+ {
+ this.EventEnum = eventEnum;
+ this.EventName = this.GetEventName(eventEnum);
+ // Dapr related properties specified
+ // If not providing alternate topic name, process event name is used as topic
+ this.DaprPubsub = daprPubSub;
+ this.DaprTopic = daprTopic ?? this.EventName;
+ }
+ }
+}
diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessEventsSubscriberInfo.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessEventsSubscriberInfo.cs
new file mode 100644
index 000000000000..8784342f1482
--- /dev/null
+++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessEventsSubscriberInfo.cs
@@ -0,0 +1,201 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using Microsoft.SemanticKernel.Process;
+using Microsoft.SemanticKernel.Process.Interfaces;
+
+namespace Microsoft.SemanticKernel;
+
+public record EventSubscriptionActions
+{
+ public MethodInfo? LocalRuntimeAction { get; set; }
+
+ public IDaprPubsubEventInfo? DaprRuntimeAction { get; set; }
+}
+
+public class KernelProcessEventsSubscriberInfo
+{
+ // key: process event id, value: actions linked to specific event and should be emitted when event triggered
+ private readonly Dictionary _eventHandlers = [];
+
+ // key: source id, value: list of full internal event id
+ private readonly Dictionary> _eventsBySourceMap = [];
+
+ // key: full internal event id, value: process event id linked
+ private readonly Dictionary _stepEventProcessEventMap = [];
+
+ // potentially _processEventSubscriberType, _subscriberServiceProvider, _processEventSubscriber can be converted to a dictionary to support
+ // many unique subscriber classes that could be linked to different ServiceProviders
+ private Type? _processEventSubscriberType = null;
+ private IServiceProvider? _subscriberServiceProvider = null;
+ private KernelProcessEventsSubscriber? _processEventSubscriber = null;
+
+ private void InitializeProcessEventSubscriber()
+ {
+ if (this._processEventSubscriber == null && this._processEventSubscriberType != null)
+ {
+ try
+ {
+ this._processEventSubscriber = (KernelProcessEventsSubscriber?)Activator.CreateInstance(this._processEventSubscriberType, []);
+ this._processEventSubscriberType.GetProperty(nameof(KernelProcessEventsSubscriber.ServiceProvider))?.SetValue(this._processEventSubscriber, this._subscriberServiceProvider);
+ }
+ catch (Exception)
+ {
+ throw new KernelException($"Could not create an instance of {this._processEventSubscriberType.Name} to be used in KernelProcessSubscriberInfo");
+ }
+ }
+ }
+
+ ///
+ /// Used by LocalRuntime to execute linked actions
+ ///
+ ///
+ ///
+ private void InvokeProcessEvent(string eventName, object? data)
+ {
+ if (this._processEventSubscriberType != null && this._eventHandlers.TryGetValue(eventName, out EventSubscriptionActions? linkedAction) && linkedAction != null)
+ {
+ this.InitializeProcessEventSubscriber();
+ linkedAction.LocalRuntimeAction?.Invoke(this._processEventSubscriber, [data]);
+ }
+ }
+
+ private void Subscribe(string eventName, MethodInfo method, IDaprPubsubEventInfo daprInfo)
+ {
+ if (this._eventHandlers.TryGetValue(eventName, out var linkedAction) && linkedAction != null)
+ {
+ linkedAction.LocalRuntimeAction = method;
+ linkedAction.DaprRuntimeAction = daprInfo;
+ return;
+ }
+
+ throw new InvalidOperationException($"Cannot link method {method.Name} to event {eventName}, must make use of EmitAsProcessEvent first or remove unused event from event subscriber.");
+ }
+
+ public KernelProcessEventsSubscriber? GetProcessEventsSubscriberInstance()
+ {
+ this.InitializeProcessEventSubscriber();
+ return this._processEventSubscriber;
+ }
+
+ public void LinkStepEventToProcessEvent(string stepEventId, string processEventId, string sourceId)
+ {
+ this._stepEventProcessEventMap.Add(stepEventId, processEventId);
+ if (!this._eventHandlers.ContainsKey(processEventId))
+ {
+ this._eventHandlers.Add(processEventId, new() { LocalRuntimeAction = null, DaprRuntimeAction = null });
+ }
+
+ if (!this._eventsBySourceMap.TryGetValue(sourceId, out List? value))
+ {
+ value = [];
+ this._eventsBySourceMap.Add(sourceId, value);
+ }
+
+ value.Add(stepEventId);
+ }
+
+ public bool TryGetLinkedProcessEvent(string stepEventId, out string? processEvent)
+ {
+ return this._stepEventProcessEventMap.TryGetValue(stepEventId, out processEvent);
+ }
+
+ public IDictionary GetLinkedDaprPublishEventsInfoBySource(string sourceId)
+ {
+ var daprEventInfo = new Dictionary();
+ if (this._eventsBySourceMap.TryGetValue(sourceId, out var stepEvents) && stepEvents.Count > 0)
+ {
+ foreach (var stepEvent in stepEvents)
+ {
+ if (!string.IsNullOrEmpty(stepEvent) &&
+ this._stepEventProcessEventMap.TryGetValue(stepEvent, out var processEvent) && !string.IsNullOrEmpty(processEvent) &&
+ this._eventHandlers.TryGetValue(processEvent, out var eventAction) &&
+ eventAction != null && eventAction.DaprRuntimeAction != null)
+ {
+ // validate dapr pubsub requirements
+ if (string.IsNullOrEmpty(eventAction.DaprRuntimeAction.DaprPubsub))
+ {
+ throw new InvalidOperationException($"Event subscriber for event {eventAction.DaprRuntimeAction.EventName} must have dapr pubsub defined");
+ }
+ if (string.IsNullOrEmpty(eventAction.DaprRuntimeAction.DaprTopic))
+ {
+ throw new InvalidOperationException($"Event subscriber for event {eventAction.DaprRuntimeAction.EventName} must have dapr topic defined");
+ }
+
+ daprEventInfo.Add(stepEvent, eventAction.DaprRuntimeAction);
+ }
+ }
+ }
+
+ return daprEventInfo;
+ }
+
+ ///
+ /// Used in Localruntime only
+ ///
+ ///
+ ///
+ public void TryInvokeProcessEventFromStepMessage(string stepEventId, object? data)
+ {
+ if (this.TryGetLinkedProcessEvent(stepEventId, out string? processEvent) && !string.IsNullOrEmpty(processEvent))
+ {
+ this.InvokeProcessEvent(processEvent!, data);
+ }
+ }
+
+ ///
+ /// Extracts the event properties and function details of the functions with the annotator
+ ///
+ ///
+ /// Type of the class that make uses of the annotators and contains the functionality to be executed
+ /// Enum that contains the process subscribable events
+ ///
+ public void SubscribeToEventsFromClass(IServiceProvider? serviceProvider = null) where TEventListeners : KernelProcessEventsSubscriber where TEvents : Enum
+ {
+ if (this._processEventSubscriberType != null)
+ {
+ throw new InvalidOperationException("Already linked process to another event subscriber class");
+ }
+
+ var methods = typeof(TEventListeners).GetMethods(BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.DeclaredOnly);
+ if (methods.Length == 0)
+ {
+ throw new InvalidOperationException($"The Event Listener type {typeof(TEventListeners).Name} has no functions to extract subscribe methods");
+ }
+
+ bool annotationsFound = false;
+ foreach (var method in methods)
+ {
+ if (method.GetCustomAttributes(typeof(KernelProcessEventsSubscriber<>.ProcessEventSubscriberAttribute), false).FirstOrDefault() is KernelProcessEventsSubscriber.ProcessEventSubscriberAttribute attribute)
+ {
+ if (attribute.EventEnum.GetType() != typeof(TEvents))
+ {
+ throw new InvalidOperationException($"The event type {attribute.EventEnum.GetType().Name} does not match the expected type {typeof(TEvents).Name}");
+ }
+
+ this.Subscribe(attribute.EventName, method, attribute);
+ annotationsFound = true;
+ }
+ }
+
+ if (!annotationsFound)
+ {
+ throw new InvalidOperationException($"The Event Listener type {typeof(TEventListeners).Name} has functions with no ProcessEventSubscriber Annotations");
+ }
+
+ this._subscriberServiceProvider = serviceProvider;
+ this._processEventSubscriberType = typeof(TEventListeners);
+ }
+
+ public IEnumerable GetLinkedStepIdsToProcessEventName(string processEventId)
+ {
+ return this._stepEventProcessEventMap
+ .Where(kv => kv.Value == processEventId)
+ .Select(kv => kv.Key);
+ }
+
+ public KernelProcessEventsSubscriberInfo() { }
+}
diff --git a/dotnet/src/Experimental/Process.Core/ProcessBuilder.cs b/dotnet/src/Experimental/Process.Core/ProcessBuilder.cs
index e475017af6ba..172f6308d84c 100644
--- a/dotnet/src/Experimental/Process.Core/ProcessBuilder.cs
+++ b/dotnet/src/Experimental/Process.Core/ProcessBuilder.cs
@@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using Microsoft.SemanticKernel.Process;
using Microsoft.SemanticKernel.Process.Internal;
using Microsoft.SemanticKernel.Process.Models;
@@ -11,7 +12,7 @@ namespace Microsoft.SemanticKernel;
///
/// Provides functionality for incrementally defining a process.
///
-public sealed class ProcessBuilder : ProcessStepBuilder
+public class ProcessBuilder : ProcessStepBuilder
{
/// The collection of steps within this process.
private readonly List _steps = [];
@@ -22,6 +23,8 @@ public sealed class ProcessBuilder : ProcessStepBuilder
/// Maps external input event Ids to the target entry step for the event.
private readonly Dictionary _externalEventTargetMap = [];
+ internal KernelProcessEventsSubscriberInfo _eventsSubscriber;
+
///
/// A boolean indicating if the current process is a step within another process.
///
@@ -301,7 +304,7 @@ public KernelProcess Build(KernelProcessStateMetadata? stateMetadata = null)
// Create the process
KernelProcessState state = new(this.Name, version: this.Version, id: this.HasParentProcess ? this.Id : null);
- KernelProcess process = new(state, builtSteps, builtEdges);
+ KernelProcess process = new(state, builtSteps, builtEdges, this._eventsSubscriber);
return process;
}
@@ -313,7 +316,91 @@ public KernelProcess Build(KernelProcessStateMetadata? stateMetadata = null)
public ProcessBuilder(string name)
: base(name)
{
+ this._eventsSubscriber = new();
}
#endregion
}
+
+///
+/// Provides functionality for incrementally defining a process with specific input/output process events
+///
+public sealed class ProcessBuilder : ProcessBuilder where TEvents : Enum, new()
+{
+ private readonly Dictionary _eventNames = [];
+
+ private void PopulateEventNames()
+ {
+ foreach (TEvents processEvent in Enum.GetValues(typeof(TEvents)))
+ {
+ this._eventNames.Add(processEvent, Enum.GetName(typeof(TEvents), processEvent)!);
+ }
+ }
+
+ #region Public Interface
+ ///
+ /// Retrieve string name of the string value
+ ///
+ ///
+ /// string of the process event enum
+ public string GetEventName(TEvents processEvent)
+ {
+ return this._eventNames[processEvent];
+ }
+
+ ///
+ /// Method that imports a specific KernelProcessEventSubscriber class type
+ /// to be used when specific TEvents get triggered inside the SK Process
+ ///
+ /// Type of the class that contains the custom event subscriber definition
+ /// services that the subscribers in the TEventListeners make use of
+ public void LinkEventSubscribersFromType(IServiceProvider? serviceProvider = null) where TEventListeners : KernelProcessEventsSubscriber
+ {
+ this._eventsSubscriber.SubscribeToEventsFromClass(serviceProvider);
+ }
+
+ ///
+ public ProcessEdgeBuilder OnInputEvent(TEvents eventId)
+ {
+ return this.OnInputEvent(this.GetEventName(eventId));
+ }
+
+ ///
+ public ProcessFunctionTargetBuilder WhereInputEventIs(TEvents eventId)
+ {
+ return this.WhereInputEventIs(this.GetEventName(eventId));
+ }
+
+ ///
+ public ProcessEdgeBuilder GetProcessEvent(TEvents processEvent)
+ {
+ return this.OnInputEvent(this.GetEventName(processEvent));
+ }
+
+ ///
+ /// Similar to but
+ /// specific to make use of ProcessEvents defined by
+ ///
+ /// process event type
+ ///
+ ///
+ public ProcessStepEdgeBuilder OnProcessEvent(TEvents eventId)
+ {
+ var eventName = this.GetEventName(eventId);
+ var linkedEventIds = this._eventsSubscriber.GetLinkedStepIdsToProcessEventName(eventName);
+
+ if (linkedEventIds == null || linkedEventIds?.Count() == 0)
+ {
+ throw new InvalidOperationException($"Could not find linked steps to process event {eventName}");
+ }
+
+ return base.OnEvent(eventName);
+ }
+
+ ///
+ public ProcessBuilder(string name) : base(name)
+ {
+ this.PopulateEventNames();
+ }
+ #endregion
+}
diff --git a/dotnet/src/Experimental/Process.Core/ProcessEdgeBuilder.cs b/dotnet/src/Experimental/Process.Core/ProcessEdgeBuilder.cs
index 6de3a24be770..2bc59f72a851 100644
--- a/dotnet/src/Experimental/Process.Core/ProcessEdgeBuilder.cs
+++ b/dotnet/src/Experimental/Process.Core/ProcessEdgeBuilder.cs
@@ -46,7 +46,7 @@ public ProcessEdgeBuilder SendEventTo(ProcessFunctionTargetBuilder target)
}
this.Target = target;
- ProcessStepEdgeBuilder edgeBuilder = new(this.Source, this.EventId) { Target = this.Target };
+ ProcessStepEdgeBuilder edgeBuilder = new(this.Source, this.EventId, this.EventId) { Target = this.Target };
this.Source.LinkTo(this.EventId, edgeBuilder);
return new ProcessEdgeBuilder(this.Source, this.EventId);
diff --git a/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs b/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs
index cc1530c953de..7f779cc79988 100644
--- a/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs
+++ b/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs
@@ -35,13 +35,12 @@ public abstract class ProcessStepBuilder
///
/// Define the behavior of the step when the event with the specified Id is fired.
///
- /// The Id of the event of interest.
/// An instance of .
- public ProcessStepEdgeBuilder OnEvent(string eventId)
+ public ProcessStepEdgeBuilder OnEvent(string eventName)
{
// scope the event to this instance of this step
- var scopedEventId = this.GetScopedEventId(eventId);
- return new ProcessStepEdgeBuilder(this, scopedEventId);
+ var scopedEventId = this.GetScopedEventId(eventName);
+ return new ProcessStepEdgeBuilder(this, scopedEventId, eventName);
}
///
diff --git a/dotnet/src/Experimental/Process.Core/ProcessStepEdgeBuilder.cs b/dotnet/src/Experimental/Process.Core/ProcessStepEdgeBuilder.cs
index f734959fb6fe..c9d539a718ca 100644
--- a/dotnet/src/Experimental/Process.Core/ProcessStepEdgeBuilder.cs
+++ b/dotnet/src/Experimental/Process.Core/ProcessStepEdgeBuilder.cs
@@ -14,9 +14,15 @@ public sealed class ProcessStepEdgeBuilder
///
/// The event Id that the edge fires on.
+ /// Unique event Id linked to the source id.
///
internal string EventId { get; }
+ ///
+ /// The event name that the edge fires on.
+ ///
+ internal string EventName { get; }
+
///
/// The source step of the edge.
///
@@ -27,13 +33,16 @@ public sealed class ProcessStepEdgeBuilder
///
/// The source step.
/// The Id of the event.
- internal ProcessStepEdgeBuilder(ProcessStepBuilder source, string eventId)
+ /// The name of the event
+ internal ProcessStepEdgeBuilder(ProcessStepBuilder source, string eventId, string eventName)
{
Verify.NotNull(source, nameof(source));
Verify.NotNullOrWhiteSpace(eventId, nameof(eventId));
+ Verify.NotNullOrWhiteSpace(eventName, nameof(eventName));
this.Source = source;
this.EventId = eventId;
+ this.EventName = eventName;
}
///
@@ -44,7 +53,7 @@ internal KernelProcessEdge Build()
Verify.NotNull(this.Source?.Id);
Verify.NotNull(this.Target);
- return new KernelProcessEdge(this.Source.Id, this.Target.Build());
+ return new KernelProcessEdge(this.Source.Id, this.Target.Build(), this.EventName, this.EventId);
}
///
@@ -67,7 +76,20 @@ public ProcessStepEdgeBuilder SendEventTo(ProcessFunctionTargetBuilder target)
this.Target = target;
this.Source.LinkTo(this.EventId, this);
- return new ProcessStepEdgeBuilder(this.Source, this.EventId);
+ return new ProcessStepEdgeBuilder(this.Source, this.EventId, this.EventName);
+ }
+
+ ///
+ /// Forward specific step events to process events so specific functions linked get executed
+ /// when receiving the specific event
+ ///
+ ///
+ ///
+ public ProcessStepEdgeBuilder EmitAsProcessEvent(ProcessEdgeBuilder processEdge)
+ {
+ processEdge.Source._eventsSubscriber?.LinkStepEventToProcessEvent(this.EventId, processEventId: processEdge.EventId, sourceId: this.Source.Id);
+
+ return new ProcessStepEdgeBuilder(this.Source, this.EventId, this.EventName);
}
///
diff --git a/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessTests.cs b/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessTests.cs
index d5d2ca19934e..28a4cc62afe4 100644
--- a/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessTests.cs
+++ b/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessTests.cs
@@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.SemanticKernel;
+using Microsoft.SemanticKernel.Process;
using SemanticKernel.IntegrationTests.TestSettings;
using Xunit;
#pragma warning restore IDE0005 // Using directive is unnecessary.
@@ -326,6 +327,81 @@ public async Task ProcessWith2NestedSubprocessSequentiallyAndMultipleOutputSteps
this.AssertStepStateLastMessage(processInfo, lastStepName, expectedLastMessage: $"{testInput}-{testInput}-{testInput}-{testInput}-{testInput}");
}
+ ///
+ ///
+ /// ┌─────┐ ┌─────┐
+ /// StartEvent ──►│ A ├──┬─►│ B ├───► LastEvent
+ /// └─────┘ ▼ └─────┘
+ /// MidEvent
+ ///
+ ///
+ ///
+ [Fact]
+ public async Task ProcessWithEventSubscriberAsync()
+ {
+ // Arrange
+ Kernel kernel = this._kernelBuilder.Build();
+ var processBuilder = new ProcessBuilder(nameof(ProcessWithEventSubscriberAsync));
+
+ var repeatStepA = processBuilder.AddStepFromType("stepA");
+ var repeatStepB = processBuilder.AddStepFromType("stepB");
+
+ processBuilder
+ .OnInputEvent(TestProcessEvents.StartEvent)
+ .SendEventTo(new ProcessFunctionTargetBuilder(repeatStepA, parameterName: "message"));
+
+ repeatStepA
+ .OnEvent(ProcessTestsEvents.OutputReadyInternal)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(TestProcessEvents.MidEvent))
+ .SendEventTo(new ProcessFunctionTargetBuilder(repeatStepB, parameterName: "message"));
+
+ repeatStepB
+ .OnEvent(ProcessTestsEvents.OutputReadyInternal)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(TestProcessEvents.LastEvent))
+ .StopProcess();
+
+ processBuilder.LinkEventSubscribersFromType();
+ var process = processBuilder.Build();
+
+ // Act
+ string testInput = "SomeData";
+ var eventSubscriberInstance = (TestEventSubscriber)process.EventsSubscriber.GetProcessEventsSubscriberInstance();
+ var processHandle = await this._fixture.StartProcessAsync(process, kernel, new KernelProcessEvent() { Id = processBuilder.GetEventName(TestProcessEvents.StartEvent), Data = testInput });
+ var processInfo = await processHandle.GetStateAsync();
+
+ // Assert
+ Assert.NotNull(processInfo);
+ // checks below only work for LocalRuntime
+ Assert.NotNull(eventSubscriberInstance);
+ Assert.Equal(string.Join(" ", Enumerable.Repeat(testInput, 2)), eventSubscriberInstance.MidEventResult);
+ Assert.Equal(string.Join(" ", Enumerable.Repeat(testInput, 4)), eventSubscriberInstance.LastEventResult);
+ }
+
+ private enum TestProcessEvents
+ {
+ StartEvent,
+ MidEvent,
+ LastEvent,
+ }
+
+ private sealed class TestEventSubscriber : KernelProcessEventsSubscriber
+ {
+ public string? MidEventResult { get; set; } = null;
+ public string? LastEventResult { get; set; } = null;
+
+ [ProcessEventSubscriber(TestProcessEvents.MidEvent)]
+ public void OnMidEventReceived(string content)
+ {
+ this.MidEventResult = content;
+ }
+
+ [ProcessEventSubscriber(TestProcessEvents.LastEvent)]
+ public void OnLastEventReceived(string content)
+ {
+ this.LastEventResult = content;
+ }
+ }
+
#region Predefined ProcessBuilders for testing
///
/// Sample long sequential process, each step has a delay.
diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs
index b7a6695996f4..1b01caafa582 100644
--- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs
+++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs
@@ -269,6 +269,9 @@ private async Task Internal_ExecuteAsync(Kernel? kernel = null, int maxSuperstep
List messageTasks = [];
foreach (var message in messagesToProcess)
{
+ // Check if message has external event handler linked to it
+ this.TryEmitMessageToExternalSubscribers(message);
+
// Check for end condition
if (message.DestinationId.Equals(ProcessConstants.EndStepName, StringComparison.OrdinalIgnoreCase))
{
@@ -300,6 +303,25 @@ private async Task Internal_ExecuteAsync(Kernel? kernel = null, int maxSuperstep
return;
}
+ //private string? TryGetLinkedExternalProcessEvent()
+ //{
+ // this._process.EventsSubscriber.
+ //}
+
+ private void TryEmitMessageToExternalSubscribers(string processEventId, object? processEventData)
+ {
+ string? processEventName = null;
+ this._process.EventsSubscriber?.TryInvokeProcessEventFromStepMessage(processEventId, processEventData);
+ if (string.IsNullOrEmpty(processEventName))
+ {
+ }
+ }
+
+ private void TryEmitMessageToExternalSubscribers(ProcessMessage message)
+ {
+ this.TryEmitMessageToExternalSubscribers(message.EventId, message.TargetEventData);
+ }
+
///
/// Processes external events that have been sent to the process, translates them to s, and enqueues
/// them to the provided message channel so that they can be processed in the next superstep.
@@ -331,6 +353,13 @@ private void EnqueueStepMessages(LocalStep step, Queue messageCh
var allStepEvents = step.GetAllEvents();
foreach (ProcessEvent stepEvent in allStepEvents)
{
+ if (this._process.EventsSubscriber != null && this._process.EventsSubscriber.TryGetLinkedProcessEvent(stepEvent.QualifiedId, out var processEventName) && !string.IsNullOrEmpty(processEventName))
+ {
+ // Since it is a subscribed to event making public in case it wasn't and renaming event name to match process name
+ var processEvent = stepEvent with { SourceId = processEventName!, Visibility = KernelProcessEventVisibility.Public };
+ base.EmitEvent(processEvent);
+ }
+
// Emit the event out of the process (this one) if it's visibility is public.
if (stepEvent.Visibility == KernelProcessEventVisibility.Public)
{
@@ -347,9 +376,9 @@ private void EnqueueStepMessages(LocalStep step, Queue messageCh
}
// Error event was raised with no edge to handle it, send it to an edge defined as the global error target.
- if (!foundEdge && stepEvent.IsError)
+ if (!foundEdge)
{
- if (this._outputEdges.TryGetValue(ProcessConstants.GlobalErrorEventId, out List? edges))
+ if (stepEvent.IsError && this._outputEdges.TryGetValue(ProcessConstants.GlobalErrorEventId, out List? edges))
{
foreach (KernelProcessEdge edge in edges)
{
@@ -357,6 +386,11 @@ private void EnqueueStepMessages(LocalStep step, Queue messageCh
messageChannel.Enqueue(message);
}
}
+ else
+ {
+ // Checking in case the step with no edges linked to it has event that should be emitted externally
+ this.TryEmitMessageToExternalSubscribers(stepEvent.QualifiedId, stepEvent.Data);
+ }
}
}
}
@@ -371,7 +405,7 @@ private async Task ToKernelProcessAsync()
var processState = new KernelProcessState(this.Name, this._stepState.Version, this.Id);
var stepTasks = this._steps.Select(step => step.ToKernelProcessStepInfoAsync()).ToList();
var steps = await Task.WhenAll(stepTasks).ConfigureAwait(false);
- return new KernelProcess(processState, steps, this._outputEdges);
+ return new KernelProcess(processState, steps, this._outputEdges, this._process.EventsSubscriber);
}
///
diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr.UnitTests/ProcessMessageSerializationTests.cs b/dotnet/src/Experimental/Process.Runtime.Dapr.UnitTests/ProcessMessageSerializationTests.cs
index 3ed0e0c42cfb..cdb13ffce6fe 100644
--- a/dotnet/src/Experimental/Process.Runtime.Dapr.UnitTests/ProcessMessageSerializationTests.cs
+++ b/dotnet/src/Experimental/Process.Runtime.Dapr.UnitTests/ProcessMessageSerializationTests.cs
@@ -127,7 +127,7 @@ private static void VerifyContainerSerialization(ProcessMessage[] processMessage
private static ProcessMessage CreateMessage(Dictionary values)
{
- return new ProcessMessage("test-source", "test-destination", "test-function", values)
+ return new ProcessMessage("test-event", "test-eventid", "test-source", "test-destination", "test-function", values)
{
TargetEventData = "testdata",
TargetEventId = "targetevent",
diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs
index 5651a7858dd0..8d2d02983b4b 100644
--- a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs
+++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs
@@ -466,7 +466,7 @@ private async Task ToDaprProcessInfoAsync()
var processState = new KernelProcessState(this.Name, this._process!.State.Version, this.Id.GetId());
var stepTasks = this._steps.Select(step => step.ToDaprStepInfoAsync()).ToList();
var steps = await Task.WhenAll(stepTasks).ConfigureAwait(false);
- return new DaprProcessInfo { InnerStepDotnetType = this._process!.InnerStepDotnetType, Edges = this._process!.Edges, State = processState, Steps = [.. steps] };
+ return new DaprProcessInfo { InnerStepDotnetType = this._process!.InnerStepDotnetType, Edges = this._process!.Edges, State = processState, Steps = [.. steps], ExternalEventsMap = [] };
}
///
diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/PubsubMessageActor.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/PubsubMessageActor.cs
new file mode 100644
index 000000000000..cb2de1cd58ce
--- /dev/null
+++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/PubsubMessageActor.cs
@@ -0,0 +1,39 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using System.Runtime.Serialization;
+using System.Text.Json.Serialization;
+using System.Threading.Tasks;
+using Dapr.Actors.Runtime;
+using Dapr.Client;
+using Microsoft.SemanticKernel.Process;
+using Microsoft.SemanticKernel.Process.Interfaces;
+using Microsoft.SemanticKernel.Process.Runtime;
+
+namespace Microsoft.SemanticKernel;
+
+///
+/// Contains information about a Step in a Dapr Process including it's state and edges.
+///
+[KnownType(typeof(KernelProcessEdge))]
+[KnownType(typeof(KernelProcessStepState))]
+[KnownType(typeof(DaprProcessInfo))]
+[KnownType(typeof(DaprMapInfo))]
+[JsonDerivedType(typeof(DaprProcessInfo))]
+[JsonDerivedType(typeof(DaprMapInfo))]
+public class PubsubMessageActor : Actor, IPubsubMessage
+{
+ private readonly DaprClient _daprClient;
+
+ public PubsubMessageActor(ActorHost host, DaprClient daprClient) : base(host)
+ {
+ this._daprClient = daprClient;
+ }
+
+ public async Task EmitPubsubMessageAsync(ProcessEvent processEvent, DaprPubsubEventData daprDetails)
+ {
+ Verify.NotNullOrWhiteSpace(daprDetails.PubsubName);
+ Verify.NotNullOrWhiteSpace(daprDetails.TopicName);
+
+ await this._daprClient.PublishEventAsync(daprDetails.PubsubName, daprDetails.TopicName, processEvent).ConfigureAwait(true);
+ }
+}
diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs
index f5445bdf0afc..9f44c866809f 100644
--- a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs
+++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs
@@ -11,6 +11,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.SemanticKernel.Process.Interfaces;
using Microsoft.SemanticKernel.Process.Internal;
using Microsoft.SemanticKernel.Process.Runtime;
using Microsoft.SemanticKernel.Process.Serialization;
@@ -167,7 +168,7 @@ public virtual async Task ToDaprStepInfoAsync()
// This allows state information to be extracted even if the step has not been activated.
await this._activateTask.Value.ConfigureAwait(false);
- var stepInfo = new DaprStepInfo { InnerStepDotnetType = this._stepInfo!.InnerStepDotnetType!, State = this._stepInfo.State, Edges = this._stepInfo.Edges! };
+ var stepInfo = new DaprStepInfo { InnerStepDotnetType = this._stepInfo!.InnerStepDotnetType!, State = this._stepInfo.State, Edges = this._stepInfo.Edges!, ExternalEventsMap = [] };
return stepInfo;
}
@@ -403,6 +404,14 @@ private Task InvokeFunction(KernelFunction function, Kernel kern
/// The event to emit.
internal async ValueTask EmitEventAsync(ProcessEvent daprEvent)
{
+ // If event is linked to be exposed externally it should be marked as public and emitted via
+ // pubsub if linked
+ if (this._stepInfo != null && this._stepInfo.ExternalEventsMap.TryGetValue(daprEvent.QualifiedId, out var daprInfo) && daprInfo != null)
+ {
+ IPubsubMessage pubsubPublisher = this.ProxyFactory.CreateActorProxy(new ActorId($"{this.Id}-{daprEvent.QualifiedId}"), nameof(PubsubMessageActor));
+ await pubsubPublisher.EmitPubsubMessageAsync(daprEvent, daprInfo).ConfigureAwait(false);
+ }
+
// Emit the event out of the process (this one) if it's visibility is public.
if (daprEvent.Visibility == KernelProcessEventVisibility.Public)
{
diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprMapInfo.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprMapInfo.cs
index 06b3193f6691..af7ab0358d0c 100644
--- a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprMapInfo.cs
+++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprMapInfo.cs
@@ -50,8 +50,8 @@ public static DaprMapInfo FromKernelProcessMap(KernelProcessMap processMap)
DaprStepInfo operationInfo =
processMap.Operation is KernelProcess processOperation
? DaprProcessInfo.FromKernelProcess(processOperation)
- : DaprStepInfo.FromKernelStepInfo(processMap.Operation);
- DaprStepInfo mapStepInfo = DaprStepInfo.FromKernelStepInfo(processMap);
+ : DaprStepInfo.FromKernelStepInfo(processMap.Operation, []);
+ DaprStepInfo mapStepInfo = DaprStepInfo.FromKernelStepInfo(processMap, []);
return new DaprMapInfo
{
@@ -59,6 +59,7 @@ processMap.Operation is KernelProcess processOperation
State = mapStepInfo.State,
Edges = mapStepInfo.Edges,
Operation = operationInfo,
+ ExternalEventsMap = []
};
}
}
diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprProcessInfo.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprProcessInfo.cs
index e0ed2064b95f..76d72893ec7d 100644
--- a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprProcessInfo.cs
+++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprProcessInfo.cs
@@ -1,7 +1,10 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
+using System.Linq;
using System.Runtime.Serialization;
+using Microsoft.SemanticKernel.Process;
+using Microsoft.SemanticKernel.Process.Interfaces;
namespace Microsoft.SemanticKernel;
@@ -62,11 +65,15 @@ public static DaprProcessInfo FromKernelProcess(KernelProcess kernelProcess)
{
Verify.NotNull(kernelProcess);
- DaprStepInfo daprStepInfo = DaprStepInfo.FromKernelStepInfo(kernelProcess);
+ var processDaprEvents = ConvertToStepPubsubData(kernelProcess.EventsSubscriber?.GetLinkedDaprPublishEventsInfoBySource(kernelProcess.State.Id!));
+ DaprStepInfo daprStepInfo = DaprStepInfo.FromKernelStepInfo(kernelProcess, []);
List daprSteps = [];
foreach (var step in kernelProcess.Steps)
{
+ var externalEvents = kernelProcess.EventsSubscriber?.GetLinkedDaprPublishEventsInfoBySource(step.State.Id!);
+ var daprEvents = ConvertToStepPubsubData(externalEvents);
+
if (step is KernelProcess processStep)
{
daprSteps.Add(DaprProcessInfo.FromKernelProcess(processStep));
@@ -77,7 +84,7 @@ public static DaprProcessInfo FromKernelProcess(KernelProcess kernelProcess)
}
else
{
- daprSteps.Add(DaprStepInfo.FromKernelStepInfo(step));
+ daprSteps.Add(DaprStepInfo.FromKernelStepInfo(step, daprEvents));
}
}
@@ -87,6 +94,22 @@ public static DaprProcessInfo FromKernelProcess(KernelProcess kernelProcess)
State = daprStepInfo.State,
Edges = daprStepInfo.Edges,
Steps = daprSteps,
+ ExternalEventsMap = processDaprEvents,
};
}
+
+ private static Dictionary ConvertToStepPubsubData(IDictionary? daprEvents)
+ {
+ if (daprEvents == null)
+ {
+ return [];
+ }
+
+ return daprEvents.ToDictionary(e => e.Key, e => new DaprPubsubEventData()
+ {
+ ProcessEventName = e.Value.EventName,
+ PubsubName = e.Value.DaprPubsub!,
+ TopicName = e.Value.DaprTopic!,
+ });
+ }
}
diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprPubsubEventData.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprPubsubEventData.cs
new file mode 100644
index 000000000000..f326441a84a7
--- /dev/null
+++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprPubsubEventData.cs
@@ -0,0 +1,15 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using System.Runtime.Serialization;
+
+namespace Microsoft.SemanticKernel.Process;
+
+[KnownType(typeof(DaprPubsubEventData))]
+public record DaprPubsubEventData
+{
+ public required string PubsubName { get; init; }
+
+ public required string ProcessEventName { get; init; }
+
+ public required string TopicName { get; init; }
+}
diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs
index 777814770fdb..4ea1c8f1e5e8 100644
--- a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs
+++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs
@@ -5,6 +5,7 @@
using System.Linq;
using System.Runtime.Serialization;
using System.Text.Json.Serialization;
+using Microsoft.SemanticKernel.Process;
namespace Microsoft.SemanticKernel;
@@ -34,6 +35,9 @@ public record DaprStepInfo
///
public required Dictionary> Edges { get; init; }
+ // key: original event name, value: event name pubsub "topic" related data
+ public required Dictionary ExternalEventsMap { get; init; }
+
///
/// Builds an instance of from the current object.
///
@@ -54,7 +58,7 @@ public KernelProcessStepInfo ToKernelProcessStepInfo()
/// Initializes a new instance of the class from an instance of .
///
/// An instance of
- public static DaprStepInfo FromKernelStepInfo(KernelProcessStepInfo kernelStepInfo)
+ public static DaprStepInfo FromKernelStepInfo(KernelProcessStepInfo kernelStepInfo, Dictionary daprEvents)
{
Verify.NotNull(kernelStepInfo, nameof(kernelStepInfo));
@@ -63,6 +67,7 @@ public static DaprStepInfo FromKernelStepInfo(KernelProcessStepInfo kernelStepIn
InnerStepDotnetType = kernelStepInfo.InnerStepType.AssemblyQualifiedName!,
State = kernelStepInfo.State,
Edges = kernelStepInfo.Edges.ToDictionary(kvp => kvp.Key, kvp => new List(kvp.Value)),
+ ExternalEventsMap = daprEvents,
};
}
}
diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IPubsubMessage.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IPubsubMessage.cs
new file mode 100644
index 000000000000..d1aed00d6ff5
--- /dev/null
+++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IPubsubMessage.cs
@@ -0,0 +1,11 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using System.Threading.Tasks;
+using Dapr.Actors;
+using Microsoft.SemanticKernel.Process.Runtime;
+
+namespace Microsoft.SemanticKernel.Process.Interfaces;
+public interface IPubsubMessage: IActor
+{
+ Task EmitPubsubMessageAsync(ProcessEvent processEvent, DaprPubsubEventData daprDetails);
+}
diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs
index 52f86899d608..6f8ff81f88a8 100644
--- a/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs
+++ b/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs
@@ -22,5 +22,6 @@ public static void AddProcessActors(this ActorRuntimeOptions actorOptions)
actorOptions.Actors.RegisterActor();
actorOptions.Actors.RegisterActor();
actorOptions.Actors.RegisterActor();
+ actorOptions.Actors.RegisterActor();
}
}
diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Process.Runtime.Dapr.csproj b/dotnet/src/Experimental/Process.Runtime.Dapr/Process.Runtime.Dapr.csproj
index e30b9c1716cb..726d890b29f7 100644
--- a/dotnet/src/Experimental/Process.Runtime.Dapr/Process.Runtime.Dapr.csproj
+++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Process.Runtime.Dapr.csproj
@@ -33,6 +33,7 @@
+
diff --git a/dotnet/src/Experimental/Process.UnitTests/Core/ProcessBuilderTests.cs b/dotnet/src/Experimental/Process.UnitTests/Core/ProcessBuilderTests.cs
index 5c0a9527a41f..52aacdb6270b 100644
--- a/dotnet/src/Experimental/Process.UnitTests/Core/ProcessBuilderTests.cs
+++ b/dotnet/src/Experimental/Process.UnitTests/Core/ProcessBuilderTests.cs
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
+using System.Threading.Tasks;
using Xunit;
namespace Microsoft.SemanticKernel.Process.Core.UnitTests;
@@ -10,6 +11,16 @@ namespace Microsoft.SemanticKernel.Process.Core.UnitTests;
///
public class ProcessBuilderTests
{
+ ///
+ /// Process Events to be used when using
+ ///
+ public enum ProcessTestEvents
+ {
+ StartEvent,
+ MidProcessEvent,
+ EndEvent,
+ }
+
private const string ProcessName = "TestProcess";
private const string StepName = "TestStep";
private const string EventId = "TestEvent";
@@ -29,6 +40,20 @@ public void ProcessBuilderInitialization()
Assert.Empty(processBuilder.Steps);
}
+ ///
+ /// Tests the initialization of the ProcessBuilder.
+ ///
+ [Fact]
+ public void ProcessBuilderWithEventsInitialization()
+ {
+ // Arrange & Act
+ var processBuilder = new ProcessBuilder(ProcessName);
+
+ // Assert
+ Assert.Equal(ProcessName, processBuilder.Name);
+ Assert.Empty(processBuilder.Steps);
+ }
+
///
/// Tests the AddStepFromType method to ensure it adds a step correctly.
///
@@ -61,6 +86,7 @@ public void InvalidOperationExceptionOnAddStepWithSameStepName()
try
{
processBuilder.AddStepFromType(StepName);
+ Assert.Fail("Expected InvalidOperationException");
}
catch (InvalidOperationException ex)
{
@@ -103,6 +129,7 @@ public void InvalidOperationExceptionOnAddSubprocessWithSameStepName()
try
{
processBuilder.AddStepFromProcess(subProcessBuilder);
+ Assert.Fail("Expected InvalidOperationException");
}
catch (InvalidOperationException ex)
{
@@ -167,6 +194,159 @@ public void OnFunctionErrorCreatesEdgeBuilder()
Assert.EndsWith("Global.OnError", edgeBuilder.EventId);
}
+ ///
+ /// Verify that the fails when linking empty Event Subscriber class
+ ///
+ [Fact]
+ public void ProcessBuilderWithProcessEventsAndEmptyEventSubscriber()
+ {
+ // Arrange
+ var processBuilder = new ProcessBuilder(ProcessName);
+
+ // Act
+ try
+ {
+ processBuilder.LinkEventSubscribersFromType();
+ Assert.Fail("Expected InvalidOperationException");
+ }
+ catch (InvalidOperationException ex)
+ {
+ // Assert
+ Assert.Equal($"The Event Listener type {nameof(EmptyTestEventSubscriber)} has no functions to extract subscribe methods", ex.Message);
+ }
+ }
+
+ ///
+ /// Verify that the fails when linking Event Subscriber class
+ /// without
+ ///
+ [Fact]
+ public void ProcessBuilderWithProcessEventsAndEventSubscriberWithoutAnnotators()
+ {
+ // Arrange
+ var processBuilder = new ProcessBuilder(ProcessName);
+
+ // Act
+ try
+ {
+ processBuilder.LinkEventSubscribersFromType();
+ Assert.Fail("Expected InvalidOperationException");
+ }
+ catch (InvalidOperationException ex)
+ {
+ // Assert
+ Assert.Equal($"The Event Listener type {nameof(IncompleteTestEventSubscriber)} has functions with no ProcessEventSubscriber Annotations", ex.Message);
+ }
+ }
+
+ ///
+ /// Verify that the fails when linking Event Subscriber class
+ /// with process events that are not linked with
+ ///
+ [Fact]
+ public void ProcessBuilderWithProcessEventsAndMissingEventForEventSubscriber()
+ {
+ // Arrange
+ var processBuilder = new ProcessBuilder(ProcessName);
+ var repeaterA = processBuilder.AddStepFromType("repeaterA");
+ var repeaterB = processBuilder.AddStepFromType("repeaterB");
+ var repeaterC = processBuilder.AddStepFromType("repeaterC");
+
+ processBuilder
+ .OnInputEvent(ProcessTestEvents.StartEvent)
+ .SendEventTo(new ProcessFunctionTargetBuilder(repeaterA));
+
+ repeaterA
+ .OnEvent(RepeatTestStep.OutputEvent)
+ // intentionally not connecting EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessTestEvents.MidProcessEvent))
+ .SendEventTo(new ProcessFunctionTargetBuilder(repeaterB));
+
+ repeaterB
+ .OnEvent(RepeatTestStep.OutputEvent)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessTestEvents.EndEvent));
+
+ // Act
+ try
+ {
+ processBuilder.LinkEventSubscribersFromType();
+ Assert.Fail("Expected InvalidOperationException");
+ }
+ catch (InvalidOperationException ex)
+ {
+ // Assert
+ Assert.Equal($"Cannot link method {nameof(CompleteTestEventSubscriber.onMidProcessEventReceived)} to event {Enum.GetName(ProcessTestEvents.MidProcessEvent)}, must make use of EmitAsProcessEvent first or remove unused event from event subscriber.", ex.Message);
+ }
+ }
+
+ ///
+ /// Verify that the fails when linking Event Subscriber class twice
+ ///
+ [Fact]
+ public void ProcessBuilderWithProcessEventsAndLinkingTwice()
+ {
+ // Arrange
+ var processBuilder = new ProcessBuilder(ProcessName);
+ var repeaterA = processBuilder.AddStepFromType("repeaterA");
+ var repeaterB = processBuilder.AddStepFromType("repeaterB");
+ var repeaterC = processBuilder.AddStepFromType("repeaterC");
+
+ processBuilder
+ .OnInputEvent(ProcessTestEvents.StartEvent)
+ .SendEventTo(new ProcessFunctionTargetBuilder(repeaterA));
+
+ repeaterA
+ .OnEvent(RepeatTestStep.OutputEvent)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessTestEvents.MidProcessEvent))
+ .SendEventTo(new ProcessFunctionTargetBuilder(repeaterB));
+
+ repeaterB
+ .OnEvent(RepeatTestStep.OutputEvent)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessTestEvents.EndEvent));
+
+ processBuilder.LinkEventSubscribersFromType();
+
+ // Act
+ try
+ {
+ processBuilder.LinkEventSubscribersFromType();
+ Assert.Fail("Expected InvalidOperationException");
+ }
+ catch (InvalidOperationException ex)
+ {
+ // Assert
+ Assert.Equal("Already linked process to another event subscriber class", ex.Message);
+ }
+ }
+
+ ///
+ /// Verify that the succeeds when linking an event subscriber with matching events
+ ///
+ [Fact]
+ public void ProcessBuilderWithProcessEventsAndMatchingEventSubscriber()
+ {
+ // Arrange
+ var processBuilder = new ProcessBuilder(ProcessName);
+ var repeaterA = processBuilder.AddStepFromType("repeaterA");
+ var repeaterB = processBuilder.AddStepFromType("repeaterB");
+ var repeaterC = processBuilder.AddStepFromType("repeaterC");
+
+ processBuilder
+ .OnInputEvent(ProcessTestEvents.StartEvent)
+ .SendEventTo(new ProcessFunctionTargetBuilder(repeaterA));
+
+ repeaterA
+ .OnEvent(RepeatTestStep.OutputEvent)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessTestEvents.MidProcessEvent))
+ .SendEventTo(new ProcessFunctionTargetBuilder(repeaterB));
+
+ repeaterB
+ .OnEvent(RepeatTestStep.OutputEvent)
+ .EmitAsProcessEvent(processBuilder.GetProcessEvent(ProcessTestEvents.EndEvent));
+
+ // Act & Assert
+ processBuilder.LinkEventSubscribersFromType();
+ }
+
///
/// A class that represents a step for testing.
///
@@ -186,6 +366,27 @@ public void TestFunction()
}
}
+ ///
+ /// A class that represents a step for testing.
+ ///
+ private sealed class RepeatTestStep : KernelProcessStep
+ {
+ ///
+ /// The name of the step.
+ ///
+ public static string Name => "RepeatTestStep";
+ public static string OutputEvent => "OutputEvent";
+
+ ///
+ /// A method that represents a function for testing.
+ ///
+ [KernelFunction]
+ public async Task TestFunctionAsync(KernelProcessStepContext context, string response)
+ {
+ await context.EmitEventAsync(OutputEvent, response);
+ }
+ }
+
///
/// A class that represents a step for testing.
///
@@ -206,4 +407,35 @@ public void GlobalErrorHandler(Exception exception)
private sealed class TestState
{
}
+
+ private sealed class EmptyTestEventSubscriber : KernelProcessEventsSubscriber { }
+
+ private sealed class IncompleteTestEventSubscriber : KernelProcessEventsSubscriber
+ {
+ public void onMidProcessEventReceived(string result)
+ {
+ }
+
+ public void onEndEventReceived(string result)
+ {
+ }
+ }
+
+ private sealed class CompleteTestEventSubscriber : KernelProcessEventsSubscriber
+ {
+ public string? onMidEventValue = null;
+ public string? onEndEventValue = null;
+
+ [ProcessEventSubscriber(ProcessTestEvents.MidProcessEvent)]
+ public void onMidProcessEventReceived(string result)
+ {
+ this.onMidEventValue = result;
+ }
+
+ [ProcessEventSubscriber(ProcessTestEvents.EndEvent)]
+ public void onEndEventReceived(string result)
+ {
+ this.onEndEventValue = result;
+ }
+ }
}
diff --git a/dotnet/src/Experimental/Process.UnitTests/Core/ProcessStepBuilderTests.cs b/dotnet/src/Experimental/Process.UnitTests/Core/ProcessStepBuilderTests.cs
index 07c4802c8731..0d5e085f2ac7 100644
--- a/dotnet/src/Experimental/Process.UnitTests/Core/ProcessStepBuilderTests.cs
+++ b/dotnet/src/Experimental/Process.UnitTests/Core/ProcessStepBuilderTests.cs
@@ -91,7 +91,7 @@ public void LinkToShouldAddEdge()
{
// Arrange
var stepBuilder = new TestProcessStepBuilder("TestStep");
- var edgeBuilder = new ProcessStepEdgeBuilder(stepBuilder, "TestEvent");
+ var edgeBuilder = new ProcessStepEdgeBuilder(stepBuilder, "TestEvent", "TestEvent");
// Act
stepBuilder.LinkTo("TestEvent", edgeBuilder);
diff --git a/dotnet/src/Experimental/Process.UnitTests/Core/ProcessStepEdgeBuilderTests.cs b/dotnet/src/Experimental/Process.UnitTests/Core/ProcessStepEdgeBuilderTests.cs
index 3e3f128e1753..35d81ef1ca97 100644
--- a/dotnet/src/Experimental/Process.UnitTests/Core/ProcessStepEdgeBuilderTests.cs
+++ b/dotnet/src/Experimental/Process.UnitTests/Core/ProcessStepEdgeBuilderTests.cs
@@ -21,7 +21,7 @@ public void ConstructorShouldInitializeProperties()
var eventType = "Event1";
// Act
- var builder = new ProcessStepEdgeBuilder(source, eventType);
+ var builder = new ProcessStepEdgeBuilder(source, eventType, eventType);
// Assert
Assert.Equal(source, builder.Source);
@@ -36,7 +36,7 @@ public void SendEventToShouldSetOutputTarget()
{
// Arrange
var source = new ProcessStepBuilder(TestStep.Name);
- var builder = new ProcessStepEdgeBuilder(source, "Event1");
+ var builder = new ProcessStepEdgeBuilder(source, "Event1", "Event1");
var outputTarget = new ProcessFunctionTargetBuilder(new ProcessStepBuilder("OutputStep"));
// Act
@@ -54,7 +54,7 @@ public void SendEventToShouldSetMultipleOutputTargets()
{
// Arrange
var source = new ProcessStepBuilder(TestStep.Name);
- var builder = new ProcessStepEdgeBuilder(source, "Event1");
+ var builder = new ProcessStepEdgeBuilder(source, "Event1", "Event1");
var outputTargetA = new ProcessFunctionTargetBuilder(new ProcessStepBuilder("StepA"));
var outputTargetB = new ProcessFunctionTargetBuilder(new ProcessStepBuilder("StepB"));
@@ -75,7 +75,7 @@ public void SendEventToShouldThrowIfOutputTargetAlreadySet()
{
// Arrange
var source = new ProcessStepBuilder(TestStep.Name);
- var builder = new ProcessStepEdgeBuilder(source, "Event1");
+ var builder = new ProcessStepEdgeBuilder(source, "Event1", "Event1");
var outputTarget1 = new ProcessFunctionTargetBuilder(source);
var outputTarget2 = new ProcessFunctionTargetBuilder(source);
@@ -94,7 +94,7 @@ public void StopProcessShouldSetOutputTargetToEndStep()
{
// Arrange
var source = new ProcessStepBuilder(TestStep.Name);
- var builder = new ProcessStepEdgeBuilder(source, "Event1");
+ var builder = new ProcessStepEdgeBuilder(source, "Event1", "Event1");
// Act
builder.StopProcess();
@@ -111,7 +111,7 @@ public void StopProcessShouldThrowIfOutputTargetAlreadySet()
{
// Arrange
var source = new ProcessStepBuilder(TestStep.Name);
- var builder = new ProcessStepEdgeBuilder(source, "Event1");
+ var builder = new ProcessStepEdgeBuilder(source, "Event1", "Event1");
var outputTarget = new ProcessFunctionTargetBuilder(source);
// Act
@@ -129,7 +129,7 @@ public void BuildShouldReturnKernelProcessEdge()
{
// Arrange
var source = new ProcessStepBuilder(TestStep.Name);
- var builder = new ProcessStepEdgeBuilder(source, "Event1");
+ var builder = new ProcessStepEdgeBuilder(source, "Event1", "Event1");
var outputTarget = new ProcessFunctionTargetBuilder(source);
builder.SendEventTo(outputTarget);
diff --git a/dotnet/src/Experimental/Process.Utilities.UnitTests/CloneTests.cs b/dotnet/src/Experimental/Process.Utilities.UnitTests/CloneTests.cs
index fb0c4764b081..31f07fd273ec 100644
--- a/dotnet/src/Experimental/Process.Utilities.UnitTests/CloneTests.cs
+++ b/dotnet/src/Experimental/Process.Utilities.UnitTests/CloneTests.cs
@@ -158,16 +158,18 @@ private static void VerifyProcess(KernelProcess expected, KernelProcess actual)
}
}
- private static Dictionary> CreateTestEdges() =>
- new()
+ private static Dictionary> CreateTestEdges()
+ {
+ return new()
{
{
"sourceId",
[
- new KernelProcessEdge("sourceId", new KernelProcessFunctionTarget("sourceId", "targetFunction", "targetParameter", "targetEventId")),
+ new KernelProcessEdge("sourceId", new KernelProcessFunctionTarget("sourceId", "targetFunction", "targetParameter", "targetEventId"), "eventName", "eventId"),
]
}
};
+ }
private sealed record TestState
{
diff --git a/dotnet/src/InternalUtilities/process/Runtime/MapExtensions.cs b/dotnet/src/InternalUtilities/process/Runtime/MapExtensions.cs
index 8c2e87f37d91..070439a9383b 100644
--- a/dotnet/src/InternalUtilities/process/Runtime/MapExtensions.cs
+++ b/dotnet/src/InternalUtilities/process/Runtime/MapExtensions.cs
@@ -30,7 +30,7 @@ public static (IEnumerable, KernelProcess, string) Initialize(this KernelProcess
new KernelProcess(
new KernelProcessState($"Map{map.Operation.State.Name}", map.Operation.State.Version, proxyId),
[map.Operation],
- new() { { ProcessConstants.MapEventId, [new KernelProcessEdge(proxyId, new KernelProcessFunctionTarget(map.Operation.State.Id!, message.FunctionName, parameterName))] } });
+ new() { { ProcessConstants.MapEventId, [new KernelProcessEdge(proxyId, new KernelProcessFunctionTarget(map.Operation.State.Id!, message.FunctionName, parameterName), startEventId, startEventId)] } });
}
return (inputValues, mapOperation, startEventId);
diff --git a/dotnet/src/InternalUtilities/process/Runtime/ProcessMessage.cs b/dotnet/src/InternalUtilities/process/Runtime/ProcessMessage.cs
index 6b7a73c57a15..92b15a5495f3 100644
--- a/dotnet/src/InternalUtilities/process/Runtime/ProcessMessage.cs
+++ b/dotnet/src/InternalUtilities/process/Runtime/ProcessMessage.cs
@@ -10,12 +10,16 @@ namespace Microsoft.SemanticKernel.Process.Runtime;
///
/// Initializes a new instance of the class.
///
+/// Original name of the name of the event triggered
+/// Original name of the name of the event triggered
/// The source identifier of the message.
/// The destination identifier of the message.
/// The name of the function associated with the message.
/// The dictionary of values associated with the message.
[KnownType(typeof(KernelProcessError))]
public record ProcessMessage(
+ string EventName,
+ string EventId,
string SourceId,
string DestinationId,
string FunctionName,
diff --git a/dotnet/src/InternalUtilities/process/Runtime/ProcessMessageFactory.cs b/dotnet/src/InternalUtilities/process/Runtime/ProcessMessageFactory.cs
index f1bcea825c22..2b706e8b39bf 100644
--- a/dotnet/src/InternalUtilities/process/Runtime/ProcessMessageFactory.cs
+++ b/dotnet/src/InternalUtilities/process/Runtime/ProcessMessageFactory.cs
@@ -24,7 +24,7 @@ internal static ProcessMessage CreateFromEdge(KernelProcessEdge edge, object? da
parameterValue.Add(target.ParameterName!, data);
}
- ProcessMessage newMessage = new(edge.SourceStepId, target.StepId, target.FunctionName, parameterValue)
+ ProcessMessage newMessage = new(edge.SourceEventName, edge.SourceEventId, edge.SourceStepId, target.StepId, target.FunctionName, parameterValue)
{
TargetEventId = target.TargetEventId,
TargetEventData = data
diff --git a/python/samples/demos/dapr_client/requirements.txt b/python/samples/demos/dapr_client/requirements.txt
new file mode 100644
index 000000000000..85c1fc60dec1
--- /dev/null
+++ b/python/samples/demos/dapr_client/requirements.txt
@@ -0,0 +1,2 @@
+dapr==1.14.0
+dapr-ext-grpc==1.14.0
\ No newline at end of file
diff --git a/python/samples/demos/dapr_client/subscriber.py b/python/samples/demos/dapr_client/subscriber.py
new file mode 100644
index 000000000000..05a6e948c0e9
--- /dev/null
+++ b/python/samples/demos/dapr_client/subscriber.py
@@ -0,0 +1,121 @@
+# Based from: https://github.com/dapr/python-sdk/tree/main/examples/pubsub-simple
+# ====================================================================================
+# SETUP:
+# 1. [Optional] Create python virtual environment and activate
+# 2. Install dependencies: python -m pip install -r ./requirements.txt
+# USAGE:
+# To run use:
+# dapr run --app-id python-subscriber --app-protocol grpc --app-port 50051 python subscriber.py
+# To stop app use:
+# dapr stop --app-id python-subscriber
+# For testing (from a separate terminal):
+# dapr publish --publish-app-id python-subscriber --pubsub pubsub --topic someotherkickofftopicname --data '{"orderId": "100"}'
+# dapr publish --publish-app-id python-subscriber --pubsub pubsub --topic sharedeventtopic --data '{"id": "100", "message": "test"}'
+# ====================================================================================
+
+from datetime import datetime, timezone
+from time import sleep
+from cloudevents.sdk.event import v1
+from dapr.ext.grpc import App
+from dapr.clients.grpc._response import TopicEventResponse
+from dapr.proto import appcallback_v1
+
+import json
+
+PUBSUB_NAME = "pubsub"
+SUB_TOPIC_1 = "someotherkickofftopicname"
+SUB_TOPIC_2 = "OnStepACompleted"
+SUB_TOPIC_3 = "OnStepBCompleted"
+SUB_TOPIC_4 = "sharedeventtopic"
+
+app = App()
+should_retry = False # To control whether dapr should retry sending a message
+
+def get_timestamp():
+ tz = timezone.utc
+ ft = "%Y-%m-%dT%H:%M:%SZ"
+ return datetime.now(tz=tz).strftime(ft)
+
+def process_message(event: v1.Event, topic_name: str):
+ data = json.loads(event.Data())
+ print(
+ f'{get_timestamp()} - Subscriber TOPIC {topic_name} received: {data}", '
+ f'content_type="{event.content_type}"',
+ flush=True,
+ )
+
+@app.subscribe(pubsub_name=PUBSUB_NAME, topic=SUB_TOPIC_1)
+def on_topic_1(event: v1.Event) -> TopicEventResponse:
+ global should_retry
+ process_message(event, SUB_TOPIC_1)
+ # event.Metadata() contains a dictionary of cloud event extensions and publish metadata
+ if should_retry:
+ should_retry = False # we only retry once in this example
+ sleep(0.5) # add some delay to help with ordering of expected logs
+ return TopicEventResponse('retry')
+ return TopicEventResponse('success')
+
+@app.subscribe(pubsub_name=PUBSUB_NAME, topic=SUB_TOPIC_2)
+def on_topic_2(event: v1.Event) -> TopicEventResponse:
+ global should_retry
+ process_message(event, SUB_TOPIC_2)
+ # event.Metadata() contains a dictionary of cloud event extensions and publish metadata
+ if should_retry:
+ should_retry = False # we only retry once in this example
+ sleep(0.5) # add some delay to help with ordering of expected logs
+ return TopicEventResponse('retry')
+ return TopicEventResponse('success')
+
+@app.subscribe(pubsub_name=PUBSUB_NAME, topic=SUB_TOPIC_3)
+def on_topic_3(event: v1.Event) -> TopicEventResponse:
+ global should_retry
+ process_message(event, SUB_TOPIC_3)
+ # event.Metadata() contains a dictionary of cloud event extensions and publish metadata
+ if should_retry:
+ should_retry = False # we only retry once in this example
+ sleep(0.5) # add some delay to help with ordering of expected logs
+ return TopicEventResponse('retry')
+ return TopicEventResponse('success')
+
+
+@app.subscribe(pubsub_name=PUBSUB_NAME, topic=SUB_TOPIC_4)
+def on_topic_4(event: v1.Event) -> TopicEventResponse:
+ global should_retry
+ process_message(event, SUB_TOPIC_4)
+ # event.Metadata() contains a dictionary of cloud event extensions and publish metadata
+ if should_retry:
+ should_retry = False # we only retry once in this example
+ sleep(0.5) # add some delay to help with ordering of expected logs
+ return TopicEventResponse('retry')
+ return TopicEventResponse('success')
+
+# == for testing with Redis only ==
+# workaround as redis pubsub does not support wildcards
+# we manually register the distinct topics
+for id in range(4, 7):
+ app._servicer._registered_topics.append(
+ appcallback_v1.TopicSubscription(pubsub_name=PUBSUB_NAME, topic=f'topic/{id}')
+ )
+# =================================
+
+
+# this allows subscribing to all events sent to this app - useful for wildcard topics
+@app.subscribe(pubsub_name=PUBSUB_NAME, topic='topic/#', disable_topic_validation=True)
+def mytopic_wildcard(event: v1.Event) -> TopicEventResponse:
+ data = json.loads(event.Data())
+ print(
+ f'Wildcard-Subscriber received: id={data["id"]}, message="{data["message"]}", '
+ f'content_type="{event.content_type}"',
+ flush=True,
+ )
+ return TopicEventResponse('success')
+
+
+# Example of an unhealthy status
+# def unhealthy():
+# raise ValueError("Not healthy")
+# app.register_health_check(unhealthy)
+
+app.register_health_check(lambda: print('Healthy'))
+
+app.run(50051)