Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.NET combine Handle and CallHandler #4386

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ public interface IAgentBase
AgentId AgentId { get; }
IAgentRuntime Context { get; }

// Methods
Task CallHandler(CloudEvent item);
Task<RpcResponse> HandleRequest(RpcRequest request);
void ReceiveMessage(Message message);
Task StoreAsync(AgentState state, CancellationToken cancellationToken = default);
Task<T> ReadAsync<T>(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new();
ValueTask PublishEventAsync(CloudEvent item, CancellationToken cancellationToken = default);
ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used in other places (eg App.cs) why remove it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IAgentBase is already deleted by recent PR. So this change no longer apply.

Anyway, the idea of moving the overloaded PublishEventAsync as an extension method is to simplify the interface definition

List<string> Subscribe(string topic);
}
75 changes: 30 additions & 45 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace Microsoft.AutoGen.Agents;

public abstract class AgentBase : IAgentBase, IHandle
public abstract class AgentBase : IAgentBase, IHandle, IHandle<CloudEvent>
{
public static readonly ActivitySource s_source = new("AutoGen.Agent");
public AgentId AgentId => _runtime.AgentId;
Expand Down Expand Up @@ -106,7 +106,7 @@ protected internal async Task HandleRpcMessage(Message msg, CancellationToken ca
{
var activity = this.ExtractActivity(msg.CloudEvent.Type, msg.CloudEvent.Metadata);
await this.InvokeWithActivityAsync(
static ((AgentBase Agent, CloudEvent Item) state, CancellationToken _) => state.Agent.CallHandler(state.Item),
static ((AgentBase Agent, CloudEvent Item) state, CancellationToken _) => state.Agent.HandleObject(state.Item),
(this, msg.CloudEvent),
activity,
msg.CloudEvent.Type, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -255,33 +255,43 @@ static async ((AgentBase Agent, CloudEvent Event) state, CancellationToken ct) =
item.Type, cancellationToken).ConfigureAwait(false);
}

public Task CallHandler(CloudEvent item)
public Task<RpcResponse> HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" });

public virtual Task HandleObject(object item)
{
if (item is CloudEvent ce)
{
return Handle(ce);
}

var genericInterfaceType = typeof(IHandle<>).MakeGenericType(item.GetType());

// check that our target actually implements this interface, otherwise call the default static
if (genericInterfaceType.IsAssignableFrom(this.GetType()))
{
var methodInfo = genericInterfaceType.GetMethod(nameof(IHandle<object>.Handle), BindingFlags.Public | BindingFlags.Instance)
?? throw new InvalidOperationException($"Method not found on type {genericInterfaceType.FullName}");

return methodInfo.Invoke(this, [item]) as Task ?? throw new InvalidOperationException("Method did not return a Task");
}

// otherwise, complain
throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}");
}

public virtual Task Handle(CloudEvent item)
{
// Only send the event to the handler if the agent type is handling that type
// foreach of the keys in the EventTypes.EventsMap[] if it contains the item.type
foreach (var key in EventTypes.EventsMap.Keys)
{
if (EventTypes.EventsMap[key].Contains(item.Type))
{
var payload = item.ProtoData.Unpack(EventTypes.TypeRegistry);
var convertedPayload = Convert.ChangeType(payload, EventTypes.Types[item.Type]);
var genericInterfaceType = typeof(IHandle<>).MakeGenericType(EventTypes.Types[item.Type]);

MethodInfo methodInfo;
try
{
// check that our target actually implements this interface, otherwise call the default static
if (genericInterfaceType.IsAssignableFrom(this.GetType()))
{
methodInfo = genericInterfaceType.GetMethod(nameof(IHandle<object>.Handle), BindingFlags.Public | BindingFlags.Instance)
?? throw new InvalidOperationException($"Method not found on type {genericInterfaceType.FullName}");
return methodInfo.Invoke(this, [payload]) as Task ?? Task.CompletedTask;
}
else
{
// The error here is we have registered for an event that we do not have code to listen to
throw new InvalidOperationException($"No handler found for event '{item.Type}'; expecting IHandle<{item.Type}> implementation.");
}
var payload = item.ProtoData.Unpack(EventTypes.TypeRegistry);
var convertedPayload = Convert.ChangeType(payload, EventTypes.Types[item.Type]);
return this.HandleObject(convertedPayload);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand - with this all agents can handle all messages? I don't think that's what you want..... Maybe a quick call to discuss because I'm not understanding it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HandleObject will check if the agent has the right handler to be invoked. If there's no right handler a "Method not found on type {genericInterfaceType.FullName}" will be raised.

}
catch (Exception ex)
{
Expand All @@ -293,29 +303,4 @@ public Task CallHandler(CloudEvent item)

return Task.CompletedTask;
}

public Task<RpcResponse> HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" });

//TODO: should this be async and cancellable?
public virtual Task HandleObject(object item)
{
// get all Handle<T> methods
var handleTMethods = this.GetType().GetMethods().Where(m => m.Name == "Handle" && m.GetParameters().Length == 1).ToList();

// get the one that matches the type of the item
var handleTMethod = handleTMethods.FirstOrDefault(m => m.GetParameters()[0].ParameterType == item.GetType());

// if we found one, invoke it
if (handleTMethod != null)
{
return (Task)handleTMethod.Invoke(this, [item])!;
}

// otherwise, complain
throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}");
}
public async ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default)
{
await PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false);
}
}
7 changes: 7 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentBaseExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// AgentBaseExtensions.cs

using System.Diagnostics;
using Google.Protobuf;
using Microsoft.AutoGen.Abstractions;

namespace Microsoft.AutoGen.Agents;

Expand Down Expand Up @@ -112,4 +114,9 @@ public static async Task InvokeWithActivityAsync<TState>(this AgentBase agent, F
activity?.Stop();
}
}

public static async ValueTask PublishEventAsync(this AgentBase agent, string topic, IMessage evt, CancellationToken cancellationToken = default)
{
await agent.PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false);
}
}
Loading