From 26c5bfb3a31e427aa2d098db463abc6a30b2e004 Mon Sep 17 00:00:00 2001 From: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> Date: Tue, 9 Jan 2024 12:31:15 -0800 Subject: [PATCH] Initial code for decorators and optional naming of workflows and activities (#651) * initial code for decorators and optional naming of workflows and activities Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> * add alternater_name decorator Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> * fix linter Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> * address review comments. Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> * address review comments. Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> --------- Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> --- README.md | 1 + examples/workflow/README.md | 39 ++++ examples/workflow/child_workflow.py | 16 +- examples/workflow/fan_out_fan_in.py | 16 +- examples/workflow/human_approval.py | 15 +- examples/workflow/monitor.py | 15 +- examples/workflow/task_chaining.py | 21 ++- .../dapr/ext/workflow/__init__.py | 3 +- .../dapr/ext/workflow/dapr_workflow_client.py | 7 + .../ext/workflow/dapr_workflow_context.py | 12 +- .../dapr/ext/workflow/workflow_runtime.py | 173 +++++++++++++++++- .../tests/test_workflow_runtime.py | 140 +++++++++++++- setup.cfg | 4 +- tox.ini | 1 + 14 files changed, 410 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index c160af33d..6b46b6381 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,7 @@ cd python-sdk pip3 install -e . pip3 install -e ./ext/dapr-ext-grpc/ pip3 install -e ./ext/dapr-ext-fastapi/ +pip3 install -e ./ext/dapr-ext-workflow/ ``` 3. Install required packages diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 769d6820f..63208c8e3 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -22,10 +22,20 @@ Each of the examples in this directory can be run directly from the command line ### Task Chaining This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command: + ```sh dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 task_chaining.py ``` + The output of this example should look like this: @@ -41,9 +51,38 @@ The output of this example should look like this: This example demonstrates how to fan-out a workflow into multiple parallel tasks, and then fan-in the results of those tasks. You can run this sample using the following command: + + ```sh dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 fan_out_fan_in.py ``` + The output of this sample should look like this: diff --git a/examples/workflow/child_workflow.py b/examples/workflow/child_workflow.py index f46ce413c..dccaa631b 100644 --- a/examples/workflow/child_workflow.py +++ b/examples/workflow/child_workflow.py @@ -13,12 +13,15 @@ import dapr.ext.workflow as wf import time +wfr = wf.WorkflowRuntime() + +@wfr.workflow def main_workflow(ctx: wf.DaprWorkflowContext): try: instance_id = ctx.instance_id child_instance_id = instance_id + '-child' - print(f'*** Calling child workflow {child_instance_id}') + print(f'*** Calling child workflow {child_instance_id}', flush=True) yield ctx.call_child_workflow( workflow=child_workflow, input=None, instance_id=child_instance_id ) @@ -28,16 +31,15 @@ def main_workflow(ctx: wf.DaprWorkflowContext): return +@wfr.workflow def child_workflow(ctx: wf.DaprWorkflowContext): instance_id = ctx.instance_id - print(f'*** Child workflow {instance_id} called') + print(f'*** Child workflow {instance_id} called', flush=True) if __name__ == '__main__': - workflowRuntime = wf.WorkflowRuntime('localhost', '50001') - workflowRuntime.register_workflow(main_workflow) - workflowRuntime.register_workflow(child_workflow) - workflowRuntime.start() + wfr.start() + time.sleep(10) # wait for workflow runtime to start wf_client = wf.DaprWorkflowClient() instance_id = wf_client.schedule_new_workflow(workflow=main_workflow) @@ -45,4 +47,4 @@ def child_workflow(ctx: wf.DaprWorkflowContext): # Wait for the workflow to complete time.sleep(5) - workflowRuntime.shutdown() + wfr.shutdown() diff --git a/examples/workflow/fan_out_fan_in.py b/examples/workflow/fan_out_fan_in.py index ebab0dc6b..e5799862f 100644 --- a/examples/workflow/fan_out_fan_in.py +++ b/examples/workflow/fan_out_fan_in.py @@ -14,7 +14,10 @@ from typing import List import dapr.ext.workflow as wf +wfr = wf.WorkflowRuntime() + +@wfr.workflow(name='batch_processing') def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): # get a batch of N work items to process in parallel work_batch = yield ctx.call_activity(get_work_batch, input=wf_input) @@ -30,10 +33,12 @@ def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): yield ctx.call_activity(process_results, input=total) +@wfr.activity(name='get_batch') def get_work_batch(ctx, batch_size: int) -> List[int]: return [i + 1 for i in range(batch_size)] +@wfr.activity def process_work_item(ctx, work_item: int) -> int: print(f'Processing work item: {work_item}.') time.sleep(5) @@ -42,21 +47,18 @@ def process_work_item(ctx, work_item: int) -> int: return result +@wfr.activity(name='final_process') def process_results(ctx, final_result: int): print(f'Final result: {final_result}.') if __name__ == '__main__': - workflowRuntime = wf.WorkflowRuntime('localhost', '50001') - workflowRuntime.register_workflow(batch_processing_workflow) - workflowRuntime.register_activity(get_work_batch) - workflowRuntime.register_activity(process_work_item) - workflowRuntime.register_activity(process_results) - workflowRuntime.start() + wfr.start() + time.sleep(10) # wait for workflow runtime to start wf_client = wf.DaprWorkflowClient() instance_id = wf_client.schedule_new_workflow(workflow=batch_processing_workflow, input=10) print(f'Workflow started. Instance ID: {instance_id}') state = wf_client.wait_for_workflow_completion(instance_id) - workflowRuntime.shutdown() + wfr.shutdown() diff --git a/examples/workflow/human_approval.py b/examples/workflow/human_approval.py index ab4dbf3c4..6a8a725d7 100644 --- a/examples/workflow/human_approval.py +++ b/examples/workflow/human_approval.py @@ -18,6 +18,8 @@ from dapr.clients import DaprClient import dapr.ext.workflow as wf +wfr = wf.WorkflowRuntime() + @dataclass class Order: @@ -38,6 +40,7 @@ def from_dict(dict): return Approval(**dict) +@wfr.workflow(name='purchase_order_wf') def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order): # Orders under $1000 are auto-approved if order.cost < 1000: @@ -59,10 +62,12 @@ def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order): return f"Approved by '{approval_details.approver}'" +@wfr.activity(name='send_approval') def send_approval_request(_, order: Order) -> None: print(f'*** Requesting approval from user for order: {order}') +@wfr.activity def place_order(_, order: Order) -> None: print(f'*** Placing order: {order}') @@ -76,12 +81,8 @@ def place_order(_, order: Order) -> None: parser.add_argument('--timeout', type=int, default=60, help='Timeout in seconds') args = parser.parse_args() - # configure and start the workflow runtime - workflowRuntime = wf.WorkflowRuntime('localhost', '50001') - workflowRuntime.register_workflow(purchase_order_workflow) - workflowRuntime.register_activity(send_approval_request) - workflowRuntime.register_activity(place_order) - workflowRuntime.start() + # start the workflow runtime + wfr.start() # Start a purchase order workflow using the user input order = Order(args.cost, 'MyProduct', 1) @@ -118,4 +119,4 @@ def prompt_for_approval(): except TimeoutError: print('*** Workflow timed out!') - workflowRuntime.shutdown() + wfr.shutdown() diff --git a/examples/workflow/monitor.py b/examples/workflow/monitor.py index 0a9bbcd91..a6da1c7db 100644 --- a/examples/workflow/monitor.py +++ b/examples/workflow/monitor.py @@ -13,8 +13,11 @@ from dataclasses import dataclass from datetime import timedelta import random +from time import sleep import dapr.ext.workflow as wf +wfr = wf.WorkflowRuntime() + @dataclass class JobStatus: @@ -22,6 +25,7 @@ class JobStatus: is_healthy: bool +@wfr.workflow(name='status_monitor') def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus): # poll a status endpoint associated with this job status = yield ctx.call_activity(check_status, input=job) @@ -43,20 +47,19 @@ def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus): ctx.continue_as_new(job) +@wfr.activity def check_status(ctx, _) -> str: return random.choice(['healthy', 'unhealthy']) +@wfr.activity def send_alert(ctx, message: str): print(f'*** Alert: {message}') if __name__ == '__main__': - workflowRuntime = wf.WorkflowRuntime() - workflowRuntime.register_workflow(status_monitor_workflow) - workflowRuntime.register_activity(check_status) - workflowRuntime.register_activity(send_alert) - workflowRuntime.start() + wfr.start() + sleep(10) # wait for workflow runtime to start wf_client = wf.DaprWorkflowClient() job_id = 'job1' @@ -76,4 +79,4 @@ def send_alert(ctx, message: str): print(f'Workflow already running. Instance ID: {job_id}') input('Press Enter to stop...\n') - workflowRuntime.shutdown() + wfr.shutdown() diff --git a/examples/workflow/task_chaining.py b/examples/workflow/task_chaining.py index b44f9c753..c24e340cf 100644 --- a/examples/workflow/task_chaining.py +++ b/examples/workflow/task_chaining.py @@ -10,9 +10,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +from time import sleep + import dapr.ext.workflow as wf +wfr = wf.WorkflowRuntime() + + +@wfr.workflow(name='random_workflow') def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): try: result1 = yield ctx.call_activity(step1, input=wf_input) @@ -24,37 +30,36 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): return [result1, result2, result3] +@wfr.activity(name='step10') def step1(ctx, activity_input): print(f'Step 1: Received input: {activity_input}.') # Do some work return activity_input + 1 +@wfr.activity def step2(ctx, activity_input): print(f'Step 2: Received input: {activity_input}.') # Do some work return activity_input * 2 +@wfr.activity def step3(ctx, activity_input): print(f'Step 3: Received input: {activity_input}.') # Do some work return activity_input ^ 2 +@wfr.activity def error_handler(ctx, error): print(f'Executing error handler: {error}.') # Do some compensating work if __name__ == '__main__': - workflowRuntime = wf.WorkflowRuntime('localhost', '50001') - workflowRuntime.register_workflow(task_chain_workflow) - workflowRuntime.register_activity(step1) - workflowRuntime.register_activity(step2) - workflowRuntime.register_activity(step3) - workflowRuntime.register_activity(error_handler) - workflowRuntime.start() + wfr.start() + sleep(10) # wait for workflow runtime to start wf_client = wf.DaprWorkflowClient() instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42) @@ -62,4 +67,4 @@ def error_handler(ctx, error): state = wf_client.wait_for_workflow_completion(instance_id) print(f'Workflow completed! Status: {state.runtime_status}') - workflowRuntime.shutdown() + wfr.shutdown() diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py index 22a55661a..99d31b15e 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py @@ -14,7 +14,7 @@ """ # Import your main classes here -from dapr.ext.workflow.workflow_runtime import WorkflowRuntime +from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext @@ -29,4 +29,5 @@ 'WorkflowStatus', 'when_all', 'when_any', + 'alternate_name', ] diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index 62c0233d4..73eadc6f1 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -81,6 +81,13 @@ def schedule_new_workflow( Returns: The ID of the scheduled workflow instance. """ + if hasattr(workflow, '_dapr_alternate_name'): + return self.__obj.schedule_new_orchestration( + workflow.__dict__['_dapr_alternate_name'], + input=input, + instance_id=instance_id, + start_at=start_at, + ) return self.__obj.schedule_new_orchestration( workflow.__name__, input=input, instance_id=instance_id, start_at=start_at ) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index 551ad1075..a868c2e15 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -57,6 +57,11 @@ def call_activity( *, input: TInput = None, ) -> task.Task[TOutput]: + if hasattr(activity, '_dapr_alternate_name'): + return self.__obj.call_activity( + activity=activity.__dict__['_dapr_alternate_name'], input=input + ) + # this return should ideally never execute return self.__obj.call_activity(activity=activity.__name__, input=input) def call_child_workflow( @@ -67,7 +72,12 @@ def wf(ctx: task.OrchestrationContext, inp: TInput): return workflow(daprWfContext, inp) # copy workflow name so durabletask.worker can find the orchestrator in its registry - wf.__name__ = workflow.__name__ + + if hasattr(workflow, '_dapr_alternate_name'): + wf.__name__ = workflow.__dict__['_dapr_alternate_name'] + else: + # this case should ideally never happen + wf.__name__ = workflow.__name__ return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id) def wait_for_external_event(self, name: str) -> task.Task: diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 67d0c5dd3..4653ee265 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -13,6 +13,8 @@ limitations under the License. """ +import inspect +from functools import wraps from typing import Optional, TypeVar from durabletask import worker, task @@ -50,17 +52,32 @@ def __init__(self, host: Optional[str] = None, port: Optional[str] = None): host_address=uri.endpoint, metadata=metadata, secure_channel=uri.tls ) - def register_workflow(self, fn: Workflow): + def register_workflow(self, fn: Workflow, *, name: Optional[str] = None): def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None): - """Responsible to call Workflow function in orchestrationWrapper""" + """Responsible to call Workflow function in orchestrationWrapper.""" daprWfContext = DaprWorkflowContext(ctx) if inp is None: return fn(daprWfContext) return fn(daprWfContext, inp) - self.__worker._registry.add_named_orchestrator(fn.__name__, orchestrationWrapper) + if hasattr(fn, '_workflow_registered'): + # whenever a workflow is registered, it has a _dapr_alternate_name attribute + alt_name = fn.__dict__['_dapr_alternate_name'] + raise ValueError(f'Workflow {fn.__name__} already registered as {alt_name}') + if hasattr(fn, '_dapr_alternate_name'): + alt_name = fn._dapr_alternate_name + if name is not None: + m = f'Workflow {fn.__name__} already has an alternate name {alt_name}' + raise ValueError(m) + else: + fn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + + self.__worker._registry.add_named_orchestrator( + fn.__dict__['_dapr_alternate_name'], orchestrationWrapper + ) + fn.__dict__['_workflow_registered'] = True - def register_activity(self, fn: Activity): + def register_activity(self, fn: Activity, *, name: Optional[str] = None): """Registers a workflow activity as a function that takes a specified input type and returns a specified output type. """ @@ -72,7 +89,22 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): return fn(wfActivityContext) return fn(wfActivityContext, inp) - self.__worker._registry.add_named_activity(fn.__name__, activityWrapper) + if hasattr(fn, '_activity_registered'): + # whenever an activity is registered, it has a _dapr_alternate_name attribute + alt_name = fn.__dict__['_dapr_alternate_name'] + raise ValueError(f'Activity {fn.__name__} already registered as {alt_name}') + if hasattr(fn, '_dapr_alternate_name'): + alt_name = fn._dapr_alternate_name + if name is not None: + m = f'Activity {fn.__name__} already has an alternate name {alt_name}' + raise ValueError(m) + else: + fn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + + self.__worker._registry.add_named_activity( + fn.__dict__['_dapr_alternate_name'], activityWrapper + ) + fn.__dict__['_activity_registered'] = True def start(self): """Starts the listening for work items on a background thread.""" @@ -81,3 +113,134 @@ def start(self): def shutdown(self): """Stops the listening for work items on a background thread.""" self.__worker.stop() + + def workflow(self, __fn: Workflow = None, *, name: Optional[str] = None): + """Decorator to register a workflow function. + + This example shows how to register a workflow function with a name: + + from dapr.ext.workflow import WorkflowRuntime + wfr = WorkflowRuntime() + + @wfr.workflow(name="add") + def add(ctx, x: int, y: int) -> int: + return x + y + + This example shows how to register a workflow function without + an alternate name: + + from dapr.ext.workflow import WorkflowRuntime + wfr = WorkflowRuntime() + + @wfr.workflow + def add(ctx, x: int, y: int) -> int: + return x + y + + Args: + name (Optional[str], optional): Name to identify the workflow function as in + the workflow runtime. Defaults to None. + """ + + def wrapper(fn: Workflow): + self.register_workflow(fn, name=name) + + @wraps(fn) + def innerfn(): + return fn + + if hasattr(fn, '_dapr_alternate_name'): + innerfn.__dict__['_dapr_alternate_name'] = fn.__dict__['_dapr_alternate_name'] + else: + innerfn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + innerfn.__signature__ = inspect.signature(fn) + return innerfn + + if __fn: + # This case is true when the decorator is used without arguments + # and the function to be decorated is passed as the first argument. + return wrapper(__fn) + + return wrapper + + def activity(self, __fn: Activity = None, *, name: Optional[str] = None): + """Decorator to register an activity function. + + This example shows how to register an activity function with an alternate name: + + from dapr.ext.workflow import WorkflowRuntime + wfr = WorkflowRuntime() + + @wfr.activity(name="add") + def add(ctx, x: int, y: int) -> int: + return x + y + + This example shows how to register an activity function without an alternate name: + + from dapr.ext.workflow import WorkflowRuntime + wfr = WorkflowRuntime() + + @wfr.activity + def add(ctx, x: int, y: int) -> int: + return x + y + + Args: + name (Optional[str], optional): Name to identify the activity function as in + the workflow runtime. Defaults to None. + """ + + def wrapper(fn: Activity): + self.register_activity(fn, name=name) + + @wraps(fn) + def innerfn(): + return fn + + if hasattr(fn, '_dapr_alternate_name'): + innerfn.__dict__['_dapr_alternate_name'] = fn.__dict__['_dapr_alternate_name'] + else: + innerfn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + innerfn.__signature__ = inspect.signature(fn) + return innerfn + + if __fn: + # This case is true when the decorator is used without arguments + # and the function to be decorated is passed as the first argument. + return wrapper(__fn) + + return wrapper + + +def alternate_name(name: Optional[str] = None): + """Decorator to register a workflow or activity function with an alternate name. + + This example shows how to register a workflow function with an alternate name: + + from dapr.ext.workflow import WorkflowRuntime + wfr = WorkflowRuntime() + + @wfr.workflow + @alternate_name(add") + def add(ctx, x: int, y: int) -> int: + return x + y + + Args: + name (Optional[str], optional): Name to identify the workflow or activity function as in + the workflow runtime. Defaults to None. + """ + + def wrapper(fn: any): + if hasattr(fn, '_dapr_alternate_name'): + raise ValueError( + f'Function {fn.__name__} already has an alternate name {fn._dapr_alternate_name}' + ) + fn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + + @wraps(fn) + def innerfn(*args, **kwargs): + return fn(*args, **kwargs) + + innerfn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + innerfn.__signature__ = inspect.signature(fn) + return innerfn + + return wrapper diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index fb8bc344a..02d6c6f3b 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -17,7 +17,7 @@ import unittest from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext from unittest import mock -from dapr.ext.workflow.workflow_runtime import WorkflowRuntime +from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext listOrchestrators: List[str] = [] @@ -33,20 +33,140 @@ def add_named_activity(self, name: str, fn): class WorkflowRuntimeTest(unittest.TestCase): + def setUp(self): + listActivities.clear() + listOrchestrators.clear() + mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start() + self.runtime_options = WorkflowRuntime() + if hasattr(self.mock_client_wf, '_dapr_alternate_name'): + del self.mock_client_wf.__dict__['_dapr_alternate_name'] + if hasattr(self.mock_client_activity, '_dapr_alternate_name'): + del self.mock_client_activity.__dict__['_dapr_alternate_name'] + if hasattr(self.mock_client_wf, '_workflow_registered'): + del self.mock_client_wf.__dict__['_workflow_registered'] + if hasattr(self.mock_client_activity, '_activity_registered'): + del self.mock_client_activity.__dict__['_activity_registered'] + def mock_client_wf(ctx: DaprWorkflowContext, input): print(f'{input}') def mock_client_activity(ctx: WorkflowActivityContext, input): print(f'{input}!', flush=True) - def test_runtime_options(self): - with mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()): - runtime_options = WorkflowRuntime() + def test_register(self): + self.runtime_options.register_workflow(self.mock_client_wf, name='mock_client_wf') + wanted_orchestrator = [self.mock_client_wf.__name__] + assert listOrchestrators == wanted_orchestrator + assert self.mock_client_wf._dapr_alternate_name == 'mock_client_wf' + assert self.mock_client_wf._workflow_registered + + self.runtime_options.register_activity(self.mock_client_activity) + wanted_activity = [self.mock_client_activity.__name__] + assert listActivities == wanted_activity + assert self.mock_client_activity._activity_registered + + def test_decorator_register(self): + client_wf = (self.runtime_options.workflow())(self.mock_client_wf) + wanted_orchestrator = [self.mock_client_wf.__name__] + assert listOrchestrators == wanted_orchestrator + assert client_wf._dapr_alternate_name == self.mock_client_wf.__name__ + assert self.mock_client_wf._workflow_registered + + client_activity = (self.runtime_options.activity())(self.mock_client_activity) + wanted_activity = [self.mock_client_activity.__name__] + assert listActivities == wanted_activity + assert client_activity._dapr_alternate_name == self.mock_client_activity.__name__ + assert self.mock_client_activity._activity_registered + + def test_both_decorator_and_register(self): + client_wf = (self.runtime_options.workflow(name='test_wf'))(self.mock_client_wf) + wanted_orchestrator = ['test_wf'] + assert listOrchestrators == wanted_orchestrator + assert client_wf._dapr_alternate_name == 'test_wf' + assert self.mock_client_wf._workflow_registered + + self.runtime_options.register_activity(self.mock_client_activity, name='test_act') + wanted_activity = ['test_act'] + assert listActivities == wanted_activity + assert hasattr(self.mock_client_activity, '_dapr_alternate_name') + assert self.mock_client_activity._activity_registered + + def test_register_wf_act_using_both_decorator_and_method(self): + client_wf = (self.runtime_options.workflow(name='test_wf'))(self.mock_client_wf) + + wanted_orchestrator = ['test_wf'] + assert listOrchestrators == wanted_orchestrator + assert client_wf._dapr_alternate_name == 'test_wf' + with self.assertRaises(ValueError) as exeception_context: + self.runtime_options.register_workflow(self.mock_client_wf) + wf_name = self.mock_client_wf.__name__ + self.assertEqual( + exeception_context.exception.args[0], + f'Workflow {wf_name} already registered as test_wf', + ) + + client_act = (self.runtime_options.activity(name='test_act'))(self.mock_client_activity) + wanted_activity = ['test_act'] + assert listActivities == wanted_activity + assert client_act._dapr_alternate_name == 'test_act' + with self.assertRaises(ValueError) as exeception_context: + self.runtime_options.register_activity(self.mock_client_activity) + act_name = self.mock_client_activity.__name__ + self.assertEqual( + exeception_context.exception.args[0], + f'Activity {act_name} already registered as test_act', + ) + + def test_duplicate_dapr_alternate_name_registration(self): + client_wf = (alternate_name(name='test'))(self.mock_client_wf) + with self.assertRaises(ValueError) as exeception_context: + (self.runtime_options.workflow(name='random'))(client_wf) + self.assertEqual( + exeception_context.exception.args[0], + f'Workflow {client_wf.__name__} already has an alternate name test', + ) + + client_act = (alternate_name(name='test'))(self.mock_client_activity) + with self.assertRaises(ValueError) as exeception_context: + (self.runtime_options.activity(name='random'))(client_act) + self.assertEqual( + exeception_context.exception.args[0], + f'Activity {client_act.__name__} already has an alternate name test', + ) + + def test_register_wf_act_using_both_decorator_and_method_without_name(self): + client_wf = (self.runtime_options.workflow())(self.mock_client_wf) + + wanted_orchestrator = ['mock_client_wf'] + assert listOrchestrators == wanted_orchestrator + assert client_wf._dapr_alternate_name == 'mock_client_wf' + with self.assertRaises(ValueError) as exeception_context: + self.runtime_options.register_workflow(self.mock_client_wf, name='test_wf') + wf_name = self.mock_client_wf.__name__ + self.assertEqual( + exeception_context.exception.args[0], + f'Workflow {wf_name} already registered as mock_client_wf', + ) + + client_act = (self.runtime_options.activity())(self.mock_client_activity) + wanted_activity = ['mock_client_activity'] + assert listActivities == wanted_activity + assert client_act._dapr_alternate_name == 'mock_client_activity' + with self.assertRaises(ValueError) as exeception_context: + self.runtime_options.register_activity(self.mock_client_activity, name='test_act') + act_name = self.mock_client_activity.__name__ + self.assertEqual( + exeception_context.exception.args[0], + f'Activity {act_name} already registered as mock_client_activity', + ) - runtime_options.register_workflow(self.mock_client_wf) - wanted_orchestrator = [self.mock_client_wf.__name__] - assert listOrchestrators == wanted_orchestrator + def test_decorator_register_optinal_name(self): + client_wf = (self.runtime_options.workflow(name='test_wf'))(self.mock_client_wf) + wanted_orchestrator = ['test_wf'] + assert listOrchestrators == wanted_orchestrator + assert client_wf._dapr_alternate_name == 'test_wf' - runtime_options.register_activity(self.mock_client_activity) - wanted_activity = [self.mock_client_activity.__name__] - assert listActivities == wanted_activity + client_act = (self.runtime_options.activity(name='test_act'))(self.mock_client_activity) + wanted_activity = ['test_act'] + assert listActivities == wanted_activity + assert client_act._dapr_alternate_name == 'test_act' diff --git a/setup.cfg b/setup.cfg index 962e70189..cfe3ff888 100644 --- a/setup.cfg +++ b/setup.cfg @@ -53,8 +53,10 @@ dapr.serializers = py.typed [flake8] -exclude = +exclude = + .venv, venv, + .env, build, dist, .git, diff --git a/tox.ini b/tox.ini index 9e16e0e62..e33bd3c30 100644 --- a/tox.ini +++ b/tox.ini @@ -59,6 +59,7 @@ commands = ./validate.sh distributed_lock ./validate.sh configuration ./validate.sh demo_workflow + ./validate.sh workflow ./validate.sh ../ commands_pre = pip3 install -e {toxinidir}/