Skip to content

Commit

Permalink
add alternater_name decorator
Browse files Browse the repository at this point in the history
Signed-off-by: Mukundan Sundararajan <[email protected]>
  • Loading branch information
mukundansundar committed Dec 20, 2023
1 parent 2688f3c commit 29bcbb7
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 42 deletions.
5 changes: 3 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""

# Import your main classes here
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
Expand All @@ -28,5 +28,6 @@
'WorkflowState',
'WorkflowStatus',
'when_all',
'when_any'
'when_any',
'alternate_name'
]
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def schedule_new_workflow(self, workflow: Workflow, *, input: Optional[TInput] =
Returns:
The ID of the scheduled workflow instance.
"""
if hasattr(workflow, '_registered_name'):
return self.__obj.schedule_new_orchestration(workflow.__dict__['_registered_name'],
if hasattr(workflow, '_alternate_name'):
return self.__obj.schedule_new_orchestration(workflow.__dict__['_alternate_name'],
input=input, instance_id=instance_id,
start_at=start_at)
return self.__obj.schedule_new_orchestration(workflow.__name__, input=input,
Expand Down
11 changes: 6 additions & 5 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:

def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *,
input: TInput = None) -> task.Task[TOutput]:
if hasattr(activity, '_registered_name'):
return self.__obj.call_activity(activity=activity.__dict__['_registered_name'],
if hasattr(activity, '_alternate_name'):
return self.__obj.call_activity(activity=activity.__dict__['_alternate_name'],
input=input)
# this return should ideally never execute
return self.__obj.call_activity(activity=activity.__name__, input=input)

def call_child_workflow(self, workflow: Workflow, *,
Expand All @@ -66,10 +67,10 @@ def wf(ctx: task.OrchestrationContext, inp: TInput):
return workflow(daprWfContext, inp)
# copy workflow name so durabletask.worker can find the orchestrator in its registry

# Any workflow function using python decorator will have a _registered_name attribute
if hasattr(workflow, '_registered_name'):
wf.__name__ = workflow.__dict__['_registered_name']
if hasattr(workflow, '_alternate_name'):
wf.__name__ = workflow.__dict__['_alternate_name']
else:
# this case should ideally never happen
wf.__name__ = workflow.__name__
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)

Expand Down
96 changes: 78 additions & 18 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,20 @@ def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] =
return fn(daprWfContext)
return fn(daprWfContext, inp)

if hasattr(fn, '_registered_name'):
raise ValueError(f'Workflow {fn.__name__} already registered as {fn._registered_name}')
fn.__dict__['_registered_name'] = name if name else fn.__name__

if name:
self.__worker._registry.add_named_orchestrator(name, orchestrationWrapper)
return
self.__worker._registry.add_named_orchestrator(fn.__name__, orchestrationWrapper)
if hasattr(fn, '_workflow_registered'):
# whenever a workflow is registered, it also has _alternate_name attribute
alt_name = fn.__dict__['_alternate_name']
raise ValueError(f'Workflow {fn.__name__} already registered as {alt_name}')
if hasattr(fn, '_alternate_name'):
if name is not None:
m = f'Workflow {fn.__name__} already has an alternate name {fn._alternate_name}'
raise ValueError(m)
else:
fn.__dict__['_alternate_name'] = name if name else fn.__name__

self.__worker._registry.add_named_orchestrator(fn.__dict__['_alternate_name'],
orchestrationWrapper)
fn.__dict__['_workflow_registered'] = True

def register_activity(self, fn: Activity, *, name: Optional[str] = None):
"""Registers a workflow activity as a function that takes
Expand All @@ -80,14 +86,20 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None):
return fn(wfActivityContext)
return fn(wfActivityContext, inp)

if hasattr(fn, '_registered_name'):
raise ValueError(f'Activity {fn.__name__} already registered as {fn._registered_name}')
fn.__dict__['_registered_name'] = name if name else fn.__name__
if hasattr(fn, '_activity_registered'):
# whenever an activity is registered, it also has _alternate_name attribute
alt_name = fn.__dict__['_alternate_name']
raise ValueError(f'Activity {fn.__name__} already registered as {alt_name}')
if hasattr(fn, '_alternate_name'):
if name is not None:
m = f'Activity {fn.__name__} already has an alternate name {fn._alternate_name}'
raise ValueError(m)
else:
fn.__dict__['_alternate_name'] = name if name else fn.__name__


if name:
self.__worker._registry.add_named_activity(name, activityWrapper)
return
self.__worker._registry.add_named_activity(fn.__name__, activityWrapper)
self.__worker._registry.add_named_activity(fn.__dict__['_alternate_name'], activityWrapper)
fn.__dict__['_activity_registered'] = True

def start(self):
"""Starts the listening for work items on a background thread."""
Expand Down Expand Up @@ -129,8 +141,10 @@ def wrapper(fn: Workflow):
@wraps(fn)
def innerfn():
return fn

innerfn.__dict__['_registered_name'] = name if name else fn.__name__
if hasattr(fn, '_alternate_name'):
innerfn.__dict__['_alternate_name'] = fn.__dict__['_alternate_name']
else:
innerfn.__dict__['_alternate_name'] = name if name else fn.__name__
innerfn.__signature__ = inspect.signature(fn)
return innerfn

Expand Down Expand Up @@ -174,7 +188,10 @@ def wrapper(fn: Activity):
def innerfn():
return fn

innerfn.__dict__['_registered_name'] = name if name else fn.__name__
if hasattr(fn, '_alternate_name'):
innerfn.__dict__['_alternate_name'] = fn.__dict__['_alternate_name']
else:
innerfn.__dict__['_alternate_name'] = name if name else fn.__name__
innerfn.__signature__ = inspect.signature(fn)
return innerfn

Expand All @@ -184,3 +201,46 @@ def innerfn():
return wrapper(__fn)

return wrapper

def alternate_name(name: Optional[str] = None):
"""Decorator to register a workflow or activity function with an alternate name.
This example shows how to register a workflow function with a name:
from dapr.ext.workflow import WorkflowRuntime
wfr = WorkflowRuntime()
@wfr.workflow
@alternate_name(add")
def add(ctx, x: int, y: int) -> int:
return x + y
This example shows how to register an activity function with a name:
from dapr.ext.workflow import WorkflowRuntime
wfr = WorkflowRuntime()
@wfr.activity
@alternate_name("add")
def add(ctx, x: int, y: int) -> int:
return x + y
Args:
name (Optional[str], optional): Name to identify the workflow or activity function as in
the workflow runtime. Defaults to None.
"""

def wrapper(fn: any):
if hasattr(fn, '_alternate_name'):
raise ValueError(f'Function {fn.__name__} already has an alternate name {fn._alternate_name}')
fn.__dict__['_alternate_name'] = name if name else fn.__name__

@wraps(fn)
def innerfn(*args, **kwargs):
return fn(*args, **kwargs)

innerfn.__dict__['_alternate_name'] = name if name else fn.__name__
innerfn.__signature__ = inspect.signature(fn)
return innerfn

return wrapper
58 changes: 43 additions & 15 deletions ext/dapr-ext-workflow/tests/test_workflow_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

from typing import List
import unittest
import logging
import inspect
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
from unittest import mock
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext

listOrchestrators: List[str] = []
Expand All @@ -35,14 +37,20 @@ def add_named_activity(self, name: str, fn):
class WorkflowRuntimeTest(unittest.TestCase):

def setUp(self):
self.log = logging.getLogger(__name__)
listActivities.clear()
listOrchestrators.clear()
mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start()
self.runtime_options = WorkflowRuntime()
if hasattr(self.mock_client_wf, "_registered_name"):
del self.mock_client_wf.__dict__["_registered_name"]
if hasattr(self.mock_client_activity, "_registered_name"):
del self.mock_client_activity.__dict__["_registered_name"]
if hasattr(self.mock_client_wf, "_alternate_name"):
del self.mock_client_wf.__dict__["_alternate_name"]
if hasattr(self.mock_client_activity, "_alternate_name"):
del self.mock_client_activity.__dict__["_alternate_name"]
if hasattr(self.mock_client_wf, "_workflow_registered"):
del self.mock_client_wf.__dict__["_workflow_registered"]
if hasattr(self.mock_client_activity, "_activity_registered"):
del self.mock_client_activity.__dict__["_activity_registered"]


def mock_client_wf(ctx: DaprWorkflowContext, input):
print(f'{input}')
Expand All @@ -54,39 +62,46 @@ def test_register(self):
self.runtime_options.register_workflow(self.mock_client_wf, name="mock_client_wf")
wanted_orchestrator = [self.mock_client_wf.__name__]
assert listOrchestrators == wanted_orchestrator
assert self.mock_client_wf._alternate_name == "mock_client_wf"
assert self.mock_client_wf._workflow_registered

self.runtime_options.register_activity(self.mock_client_activity)
wanted_activity = [self.mock_client_activity.__name__]
assert listActivities == wanted_activity
assert self.mock_client_activity._activity_registered

def test_decorator_register(self):
client_wf = (self.runtime_options.workflow())(self.mock_client_wf)
wanted_orchestrator = [self.mock_client_wf.__name__]
assert listOrchestrators == wanted_orchestrator
assert client_wf._registered_name == self.mock_client_wf.__name__
assert client_wf._alternate_name == self.mock_client_wf.__name__
assert self.mock_client_wf._workflow_registered

client_activity = (self.runtime_options.activity())(self.mock_client_activity)
wanted_activity = [self.mock_client_activity.__name__]
assert listActivities == wanted_activity
assert client_activity._registered_name == self.mock_client_activity.__name__
assert client_activity._alternate_name == self.mock_client_activity.__name__
assert self.mock_client_activity._activity_registered

def test_both_decorator_and_register(self):
client_wf = (self.runtime_options.workflow(name="test_wf"))(self.mock_client_wf)
wanted_orchestrator = ["test_wf"]
assert listOrchestrators == wanted_orchestrator
assert client_wf._registered_name == "test_wf"
assert client_wf._alternate_name == "test_wf"
assert self.mock_client_wf._workflow_registered

self.runtime_options.register_activity(self.mock_client_activity, name="test_act")
wanted_activity = ["test_act"]
assert listActivities == wanted_activity
assert hasattr(self.mock_client_activity, "_registered_name")
assert hasattr(self.mock_client_activity, "_alternate_name")
assert self.mock_client_activity._activity_registered

def test_register_wf_act_using_both_decorator_and_method(self):
client_wf = (self.runtime_options.workflow(name="test_wf"))(self.mock_client_wf)

wanted_orchestrator = ["test_wf"]
assert listOrchestrators == wanted_orchestrator
assert client_wf._registered_name == "test_wf"
assert client_wf._alternate_name == "test_wf"
with self.assertRaises(ValueError) as exeception_context:
self.runtime_options.register_workflow(self.mock_client_wf)
wf_name = self.mock_client_wf.__name__
Expand All @@ -96,19 +111,32 @@ def test_register_wf_act_using_both_decorator_and_method(self):
client_act = (self.runtime_options.activity(name="test_act"))(self.mock_client_activity)
wanted_activity = ["test_act"]
assert listActivities == wanted_activity
assert client_act._registered_name == "test_act"
assert client_act._alternate_name == "test_act"
with self.assertRaises(ValueError) as exeception_context:
self.runtime_options.register_activity(self.mock_client_activity)
act_name = self.mock_client_activity.__name__
self.assertEqual(exeception_context.exception.args[0],
f'Activity {act_name} already registered as test_act')

def test_duplicate_alternate_name_registration(self):
client_wf = (alternate_name(name="test"))(self.mock_client_wf)
with self.assertRaises(ValueError) as exeception_context:
(self.runtime_options.workflow(name="random"))(client_wf)
self.assertEqual(exeception_context.exception.args[0],
f'Workflow {client_wf.__name__} already has an alternate name test')

client_act = (alternate_name(name="test"))(self.mock_client_activity)
with self.assertRaises(ValueError) as exeception_context:
(self.runtime_options.activity(name="random"))(client_act)
self.assertEqual(exeception_context.exception.args[0],
f'Activity {client_act.__name__} already has an alternate name test')

def test_register_wf_act_using_both_decorator_and_method_without_name(self):
client_wf = (self.runtime_options.workflow())(self.mock_client_wf)

wanted_orchestrator = ["mock_client_wf"]
assert listOrchestrators == wanted_orchestrator
assert client_wf._registered_name == "mock_client_wf"
assert client_wf._alternate_name == "mock_client_wf"
with self.assertRaises(ValueError) as exeception_context:
self.runtime_options.register_workflow(self.mock_client_wf, name="test_wf")
wf_name = self.mock_client_wf.__name__
Expand All @@ -118,7 +146,7 @@ def test_register_wf_act_using_both_decorator_and_method_without_name(self):
client_act = (self.runtime_options.activity())(self.mock_client_activity)
wanted_activity = ["mock_client_activity"]
assert listActivities == wanted_activity
assert client_act._registered_name == "mock_client_activity"
assert client_act._alternate_name == "mock_client_activity"
with self.assertRaises(ValueError) as exeception_context:
self.runtime_options.register_activity(self.mock_client_activity, name="test_act")
act_name = self.mock_client_activity.__name__
Expand All @@ -129,9 +157,9 @@ def test_decorator_register_optinal_name(self):
client_wf = (self.runtime_options.workflow(name="test_wf"))(self.mock_client_wf)
wanted_orchestrator = ["test_wf"]
assert listOrchestrators == wanted_orchestrator
assert client_wf._registered_name == "test_wf"
assert client_wf._alternate_name == "test_wf"

client_act = (self.runtime_options.activity(name="test_act"))(self.mock_client_activity)
wanted_activity = ["test_act"]
assert listActivities == wanted_activity
assert client_act._registered_name == "test_act"
assert client_act._alternate_name == "test_act"

0 comments on commit 29bcbb7

Please sign in to comment.