diff --git a/README.md b/README.md index 9a7d597cc..233cfe3c9 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,7 @@ cd python-sdk pip3 install -e . pip3 install -e ./ext/dapr-ext-grpc/ pip3 install -e ./ext/dapr-ext-fastapi/ +pip3 install -e ./ext/dapr-ext-workflow/ ``` 3. Install required packages diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 769d6820f..63208c8e3 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -22,10 +22,20 @@ Each of the examples in this directory can be run directly from the command line ### Task Chaining This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command: + ```sh dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 task_chaining.py ``` + The output of this example should look like this: @@ -41,9 +51,38 @@ The output of this example should look like this: This example demonstrates how to fan-out a workflow into multiple parallel tasks, and then fan-in the results of those tasks. You can run this sample using the following command: + + ```sh dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 fan_out_fan_in.py ``` + The output of this sample should look like this: diff --git a/examples/workflow/child_workflow.py b/examples/workflow/child_workflow.py index 20fc1382d..b4d2cf274 100644 --- a/examples/workflow/child_workflow.py +++ b/examples/workflow/child_workflow.py @@ -13,32 +13,33 @@ import dapr.ext.workflow as wf import time +wfr = wf.WorkflowRuntime() + +@wfr.workflow def main_workflow(ctx: wf.DaprWorkflowContext): try: instance_id = ctx.instance_id child_instance_id = instance_id + '-child' - print(f'*** Calling child workflow {child_instance_id}') + print(f'*** Calling child workflow {child_instance_id}', flush=True) yield ctx.call_child_workflow(workflow=child_workflow,input=None,instance_id=child_instance_id) except Exception as e: print(f'*** Exception: {e}') - + return +@wfr.workflow def child_workflow(ctx: wf.DaprWorkflowContext): instance_id = ctx.instance_id - print(f'*** Child workflow {instance_id} called') + print(f'*** Child workflow {instance_id} called', flush=True) if __name__ == '__main__': - workflowRuntime = wf.WorkflowRuntime("localhost", "50001") - workflowRuntime.register_workflow(main_workflow) - workflowRuntime.register_workflow(child_workflow) - workflowRuntime.start() + wfr.start() + time.sleep(10) # wait for workflow runtime to start wf_client = wf.DaprWorkflowClient() - instance_id = wf_client.schedule_new_workflow( - workflow=main_workflow) + instance_id = wf_client.schedule_new_workflow(workflow=main_workflow) # Wait for the workflow to complete time.sleep(5) - - workflowRuntime.shutdown() \ No newline at end of file + + wfr.shutdown() \ No newline at end of file diff --git a/examples/workflow/fan_out_fan_in.py b/examples/workflow/fan_out_fan_in.py index 458737c6f..dff17c17f 100644 --- a/examples/workflow/fan_out_fan_in.py +++ b/examples/workflow/fan_out_fan_in.py @@ -14,7 +14,9 @@ from typing import List import dapr.ext.workflow as wf +wfr = wf.WorkflowRuntime() +@wfr.workflow(name="batch_processing") def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): # get a batch of N work items to process in parallel work_batch = yield ctx.call_activity(get_work_batch, input=wf_input) @@ -27,11 +29,11 @@ def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): total = sum(outputs) yield ctx.call_activity(process_results, input=total) - +@wfr.activity(name="get_batch") def get_work_batch(ctx, batch_size: int) -> List[int]: return [i + 1 for i in range(batch_size)] - +@wfr.activity def process_work_item(ctx, work_item: int) -> int: print(f'Processing work item: {work_item}.') time.sleep(5) @@ -39,18 +41,14 @@ def process_work_item(ctx, work_item: int) -> int: print(f'Work item {work_item} processed. Result: {result}.') return result - +@wfr.activity(name="final_process") def process_results(ctx, final_result: int): print(f'Final result: {final_result}.') if __name__ == '__main__': - workflowRuntime = wf.WorkflowRuntime("localhost", "50001") - workflowRuntime.register_workflow(batch_processing_workflow) - workflowRuntime.register_activity(get_work_batch) - workflowRuntime.register_activity(process_work_item) - workflowRuntime.register_activity(process_results) - workflowRuntime.start() + wfr.start() + time.sleep(10) # wait for workflow runtime to start wf_client = wf.DaprWorkflowClient() instance_id = wf_client.schedule_new_workflow( @@ -59,4 +57,4 @@ def process_results(ctx, final_result: int): print(f'Workflow started. Instance ID: {instance_id}') state = wf_client.wait_for_workflow_completion(instance_id) - workflowRuntime.shutdown() + wfr.shutdown() diff --git a/examples/workflow/human_approval.py b/examples/workflow/human_approval.py index 78662dc55..f67eca0b1 100644 --- a/examples/workflow/human_approval.py +++ b/examples/workflow/human_approval.py @@ -18,6 +18,7 @@ from dapr.clients import DaprClient import dapr.ext.workflow as wf +wfr = wf.WorkflowRuntime() @dataclass class Order: @@ -37,7 +38,7 @@ class Approval: def from_dict(dict): return Approval(**dict) - +@wfr.workflow(name="purchase_order_wf") def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order): # Orders under $1000 are auto-approved if order.cost < 1000: @@ -59,10 +60,12 @@ def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order): return f"Approved by '{approval_details.approver}'" +@wfr.activity(name="send_approval") def send_approval_request(_, order: Order) -> None: print(f'*** Requesting approval from user for order: {order}') +@wfr.activity def place_order(_, order: Order) -> None: print(f'*** Placing order: {order}') @@ -76,12 +79,8 @@ def place_order(_, order: Order) -> None: parser.add_argument("--timeout", type=int, default=60, help="Timeout in seconds") args = parser.parse_args() - # configure and start the workflow runtime - workflowRuntime = wf.WorkflowRuntime("localhost", "50001") - workflowRuntime.register_workflow(purchase_order_workflow) - workflowRuntime.register_activity(send_approval_request) - workflowRuntime.register_activity(place_order) - workflowRuntime.start() + # start the workflow runtime + wfr.start() # Start a purchase order workflow using the user input order = Order(args.cost, "MyProduct", 1) @@ -119,4 +118,4 @@ def prompt_for_approval(): except TimeoutError: print("*** Workflow timed out!") - workflowRuntime.shutdown() + wfr.shutdown() diff --git a/examples/workflow/monitor.py b/examples/workflow/monitor.py index 150deae8b..606319705 100644 --- a/examples/workflow/monitor.py +++ b/examples/workflow/monitor.py @@ -13,15 +13,17 @@ from dataclasses import dataclass from datetime import timedelta import random +from time import sleep import dapr.ext.workflow as wf - +wfr = wf.WorkflowRuntime() @dataclass class JobStatus: job_id: str is_healthy: bool +@wfr.workflow(name="status_monitor") def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus): # poll a status endpoint associated with this job status = yield ctx.call_activity(check_status, input=job) @@ -43,20 +45,19 @@ def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus): ctx.continue_as_new(job) +@wfr.activity def check_status(ctx, _) -> str: return random.choice(["healthy", "unhealthy"]) +@wfr.activity def send_alert(ctx, message: str): print(f'*** Alert: {message}') if __name__ == '__main__': - workflowRuntime = wf.WorkflowRuntime() - workflowRuntime.register_workflow(status_monitor_workflow) - workflowRuntime.register_activity(check_status) - workflowRuntime.register_activity(send_alert) - workflowRuntime.start() + wfr.start() + sleep(10) # wait for workflow runtime to start wf_client = wf.DaprWorkflowClient() job_id = "job1" @@ -75,4 +76,4 @@ def send_alert(ctx, message: str): print(f'Workflow already running. Instance ID: {job_id}') input("Press Enter to stop...\n") - workflowRuntime.shutdown() + wfr.shutdown() diff --git a/examples/workflow/task_chaining.py b/examples/workflow/task_chaining.py index 767a1d9d3..aeefd2f07 100644 --- a/examples/workflow/task_chaining.py +++ b/examples/workflow/task_chaining.py @@ -10,9 +10,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +from time import sleep + import dapr.ext.workflow as wf +wfr = wf.WorkflowRuntime() + +@wfr.workflow(name="random_workflow") def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): try: result1 = yield ctx.call_activity(step1, input=wf_input) @@ -24,37 +29,36 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): return [result1, result2, result3] +@wfr.activity(name="step10") def step1(ctx, activity_input): print(f'Step 1: Received input: {activity_input}.') # Do some work return activity_input + 1 +@wfr.activity def step2(ctx, activity_input): print(f'Step 2: Received input: {activity_input}.') # Do some work return activity_input * 2 +@wfr.activity def step3(ctx, activity_input): print(f'Step 3: Received input: {activity_input}.') # Do some work return activity_input ^ 2 +@wfr.activity def error_handler(ctx, error): print(f'Executing error handler: {error}.') # Do some compensating work if __name__ == '__main__': - workflowRuntime = wf.WorkflowRuntime("localhost", "50001") - workflowRuntime.register_workflow(task_chain_workflow) - workflowRuntime.register_activity(step1) - workflowRuntime.register_activity(step2) - workflowRuntime.register_activity(step3) - workflowRuntime.register_activity(error_handler) - workflowRuntime.start() + wfr.start() + sleep(10) # wait for workflow runtime to start wf_client = wf.DaprWorkflowClient() instance_id = wf_client.schedule_new_workflow( @@ -64,4 +68,4 @@ def error_handler(ctx, error): state = wf_client.wait_for_workflow_completion(instance_id) print(f'Workflow completed! Status: {state.runtime_status}') - workflowRuntime.shutdown() + wfr.shutdown() diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py index 9f9146ec3..e9314d444 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py @@ -14,7 +14,7 @@ """ # Import your main classes here -from dapr.ext.workflow.workflow_runtime import WorkflowRuntime +from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext @@ -28,5 +28,6 @@ 'WorkflowState', 'WorkflowStatus', 'when_all', - 'when_any' + 'when_any', + 'alternate_name' ] diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index 8f9207fa6..4553351e6 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -75,8 +75,13 @@ def schedule_new_workflow(self, workflow: Workflow, *, input: Optional[TInput] = Returns: The ID of the scheduled workflow instance. """ + if hasattr(workflow, '_dapr_alternate_name'): + return self.__obj.schedule_new_orchestration(workflow.__dict__['_dapr_alternate_name'], + input=input, instance_id=instance_id, + start_at=start_at) return self.__obj.schedule_new_orchestration(workflow.__name__, input=input, - instance_id=instance_id, start_at=start_at) + instance_id=instance_id, + start_at=start_at) def get_workflow_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Optional[WorkflowState]: diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index c65a48186..a1480c621 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -53,6 +53,10 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *, input: TInput = None) -> task.Task[TOutput]: + if hasattr(activity, '_dapr_alternate_name'): + return self.__obj.call_activity(activity=activity.__dict__['_dapr_alternate_name'], + input=input) + # this return should ideally never execute return self.__obj.call_activity(activity=activity.__name__, input=input) def call_child_workflow(self, workflow: Workflow, *, @@ -62,7 +66,12 @@ def wf(ctx: task.OrchestrationContext, inp: TInput): daprWfContext = DaprWorkflowContext(ctx) return workflow(daprWfContext, inp) # copy workflow name so durabletask.worker can find the orchestrator in its registry - wf.__name__ = workflow.__name__ + + if hasattr(workflow, '_dapr_alternate_name'): + wf.__name__ = workflow.__dict__['_dapr_alternate_name'] + else: + # this case should ideally never happen + wf.__name__ = workflow.__name__ return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id) def wait_for_external_event(self, name: str) -> task.Task: diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index a28413dff..67820946f 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -13,6 +13,8 @@ limitations under the License. """ +import inspect +from functools import wraps from typing import Optional, TypeVar from durabletask import worker, task @@ -50,17 +52,31 @@ def __init__(self, host: Optional[str] = None, port: Optional[str] = None): self.__worker = worker.TaskHubGrpcWorker(host_address=uri.endpoint, metadata=metadata, secure_channel=uri.tls) - def register_workflow(self, fn: Workflow): + def register_workflow(self, fn: Workflow, *, name: Optional[str] = None): def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None): - """Responsible to call Workflow function in orchestrationWrapper""" + """Responsible to call Workflow function in orchestrationWrapper.""" daprWfContext = DaprWorkflowContext(ctx) if inp is None: return fn(daprWfContext) return fn(daprWfContext, inp) - self.__worker._registry.add_named_orchestrator(fn.__name__, orchestrationWrapper) - - def register_activity(self, fn: Activity): + if hasattr(fn, '_workflow_registered'): + # whenever a workflow is registered, it has a _dapr_alternate_name attribute + alt_name = fn.__dict__['_dapr_alternate_name'] + raise ValueError(f'Workflow {fn.__name__} already registered as {alt_name}') + if hasattr(fn, '_dapr_alternate_name'): + alt_name = fn._dapr_alternate_name + if name is not None: + m = f'Workflow {fn.__name__} already has an alternate name {alt_name}' + raise ValueError(m) + else: + fn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + + self.__worker._registry.add_named_orchestrator(fn.__dict__['_dapr_alternate_name'], + orchestrationWrapper) + fn.__dict__['_workflow_registered'] = True + + def register_activity(self, fn: Activity, *, name: Optional[str] = None): """Registers a workflow activity as a function that takes a specified input type and returns a specified output type. """ @@ -71,7 +87,21 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): return fn(wfActivityContext) return fn(wfActivityContext, inp) - self.__worker._registry.add_named_activity(fn.__name__, activityWrapper) + if hasattr(fn, '_activity_registered'): + # whenever an activity is registered, it has a _dapr_alternate_name attribute + alt_name = fn.__dict__['_dapr_alternate_name'] + raise ValueError(f'Activity {fn.__name__} already registered as {alt_name}') + if hasattr(fn, '_dapr_alternate_name'): + alt_name = fn._dapr_alternate_name + if name is not None: + m = f'Activity {fn.__name__} already has an alternate name {alt_name}' + raise ValueError(m) + else: + fn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + + self.__worker._registry.add_named_activity(fn.__dict__['_dapr_alternate_name'], + activityWrapper) + fn.__dict__['_activity_registered'] = True def start(self): """Starts the listening for work items on a background thread.""" @@ -80,3 +110,132 @@ def start(self): def shutdown(self): """Stops the listening for work items on a background thread.""" self.__worker.stop() + + def workflow(self, __fn: Workflow = None, *, name: Optional[str] = None): + """Decorator to register a workflow function. + + This example shows how to register a workflow function with a name: + + from dapr.ext.workflow import WorkflowRuntime + wfr = WorkflowRuntime() + + @wfr.workflow(name="add") + def add(ctx, x: int, y: int) -> int: + return x + y + + This example shows how to register a workflow function without + an alternate name: + + from dapr.ext.workflow import WorkflowRuntime + wfr = WorkflowRuntime() + + @wfr.workflow + def add(ctx, x: int, y: int) -> int: + return x + y + + Args: + name (Optional[str], optional): Name to identify the workflow function as in + the workflow runtime. Defaults to None. + """ + + def wrapper(fn: Workflow): + self.register_workflow(fn, name=name) + + @wraps(fn) + def innerfn(): + return fn + if hasattr(fn, '_dapr_alternate_name'): + innerfn.__dict__['_dapr_alternate_name'] = fn.__dict__['_dapr_alternate_name'] + else: + innerfn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + innerfn.__signature__ = inspect.signature(fn) + return innerfn + + if __fn: + # This case is true when the decorator is used without arguments + # and the function to be decorated is passed as the first argument. + return wrapper(__fn) + + return wrapper + + def activity(self, __fn: Activity = None, *, name: Optional[str] = None): + """Decorator to register an activity function. + + This example shows how to register an activity function with an alternate name: + + from dapr.ext.workflow import WorkflowRuntime + wfr = WorkflowRuntime() + + @wfr.activity(name="add") + def add(ctx, x: int, y: int) -> int: + return x + y + + This example shows how to register an activity function without an alternate name: + + from dapr.ext.workflow import WorkflowRuntime + wfr = WorkflowRuntime() + + @wfr.activity + def add(ctx, x: int, y: int) -> int: + return x + y + + Args: + name (Optional[str], optional): Name to identify the activity function as in + the workflow runtime. Defaults to None. + """ + + def wrapper(fn: Activity): + self.register_activity(fn, name=name) + + @wraps(fn) + def innerfn(): + return fn + + if hasattr(fn, '_dapr_alternate_name'): + innerfn.__dict__['_dapr_alternate_name'] = fn.__dict__['_dapr_alternate_name'] + else: + innerfn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + innerfn.__signature__ = inspect.signature(fn) + return innerfn + + if __fn: + # This case is true when the decorator is used without arguments + # and the function to be decorated is passed as the first argument. + return wrapper(__fn) + + return wrapper + + +def alternate_name(name: Optional[str] = None): + """Decorator to register a workflow or activity function with an alternate name. + + This example shows how to register a workflow function with an alternate name: + + from dapr.ext.workflow import WorkflowRuntime + wfr = WorkflowRuntime() + + @wfr.workflow + @alternate_name(add") + def add(ctx, x: int, y: int) -> int: + return x + y + + Args: + name (Optional[str], optional): Name to identify the workflow or activity function as in + the workflow runtime. Defaults to None. + """ + + def wrapper(fn: any): + if hasattr(fn, '_dapr_alternate_name'): + raise ValueError( + f'Function {fn.__name__} already has an alternate name {fn._dapr_alternate_name}') + fn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + + @wraps(fn) + def innerfn(*args, **kwargs): + return fn(*args, **kwargs) + + innerfn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ + innerfn.__signature__ = inspect.signature(fn) + return innerfn + + return wrapper diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index f38ca919b..c499a7851 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -17,7 +17,7 @@ import unittest from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext from unittest import mock -from dapr.ext.workflow.workflow_runtime import WorkflowRuntime +from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext listOrchestrators: List[str] = [] @@ -34,20 +34,128 @@ def add_named_activity(self, name: str, fn): class WorkflowRuntimeTest(unittest.TestCase): + def setUp(self): + listActivities.clear() + listOrchestrators.clear() + mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start() + self.runtime_options = WorkflowRuntime() + if hasattr(self.mock_client_wf, "_dapr_alternate_name"): + del self.mock_client_wf.__dict__["_dapr_alternate_name"] + if hasattr(self.mock_client_activity, "_dapr_alternate_name"): + del self.mock_client_activity.__dict__["_dapr_alternate_name"] + if hasattr(self.mock_client_wf, "_workflow_registered"): + del self.mock_client_wf.__dict__["_workflow_registered"] + if hasattr(self.mock_client_activity, "_activity_registered"): + del self.mock_client_activity.__dict__["_activity_registered"] + def mock_client_wf(ctx: DaprWorkflowContext, input): print(f'{input}') def mock_client_activity(ctx: WorkflowActivityContext, input): print(f'{input}!', flush=True) - def test_runtime_options(self): - with mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()): - runtime_options = WorkflowRuntime() + def test_register(self): + self.runtime_options.register_workflow(self.mock_client_wf, name="mock_client_wf") + wanted_orchestrator = [self.mock_client_wf.__name__] + assert listOrchestrators == wanted_orchestrator + assert self.mock_client_wf._dapr_alternate_name == "mock_client_wf" + assert self.mock_client_wf._workflow_registered + + self.runtime_options.register_activity(self.mock_client_activity) + wanted_activity = [self.mock_client_activity.__name__] + assert listActivities == wanted_activity + assert self.mock_client_activity._activity_registered + + def test_decorator_register(self): + client_wf = (self.runtime_options.workflow())(self.mock_client_wf) + wanted_orchestrator = [self.mock_client_wf.__name__] + assert listOrchestrators == wanted_orchestrator + assert client_wf._dapr_alternate_name == self.mock_client_wf.__name__ + assert self.mock_client_wf._workflow_registered + + client_activity = (self.runtime_options.activity())(self.mock_client_activity) + wanted_activity = [self.mock_client_activity.__name__] + assert listActivities == wanted_activity + assert client_activity._dapr_alternate_name == self.mock_client_activity.__name__ + assert self.mock_client_activity._activity_registered + + def test_both_decorator_and_register(self): + client_wf = (self.runtime_options.workflow(name="test_wf"))(self.mock_client_wf) + wanted_orchestrator = ["test_wf"] + assert listOrchestrators == wanted_orchestrator + assert client_wf._dapr_alternate_name == "test_wf" + assert self.mock_client_wf._workflow_registered + + self.runtime_options.register_activity(self.mock_client_activity, name="test_act") + wanted_activity = ["test_act"] + assert listActivities == wanted_activity + assert hasattr(self.mock_client_activity, "_dapr_alternate_name") + assert self.mock_client_activity._activity_registered + + def test_register_wf_act_using_both_decorator_and_method(self): + client_wf = (self.runtime_options.workflow(name="test_wf"))(self.mock_client_wf) + + wanted_orchestrator = ["test_wf"] + assert listOrchestrators == wanted_orchestrator + assert client_wf._dapr_alternate_name == "test_wf" + with self.assertRaises(ValueError) as exeception_context: + self.runtime_options.register_workflow(self.mock_client_wf) + wf_name = self.mock_client_wf.__name__ + self.assertEqual(exeception_context.exception.args[0], + f'Workflow {wf_name} already registered as test_wf') + + client_act = (self.runtime_options.activity(name="test_act"))(self.mock_client_activity) + wanted_activity = ["test_act"] + assert listActivities == wanted_activity + assert client_act._dapr_alternate_name == "test_act" + with self.assertRaises(ValueError) as exeception_context: + self.runtime_options.register_activity(self.mock_client_activity) + act_name = self.mock_client_activity.__name__ + self.assertEqual(exeception_context.exception.args[0], + f'Activity {act_name} already registered as test_act') + + def test_duplicate_dapr_alternate_name_registration(self): + client_wf = (alternate_name(name="test"))(self.mock_client_wf) + with self.assertRaises(ValueError) as exeception_context: + (self.runtime_options.workflow(name="random"))(client_wf) + self.assertEqual(exeception_context.exception.args[0], + f'Workflow {client_wf.__name__} already has an alternate name test') + + client_act = (alternate_name(name="test"))(self.mock_client_activity) + with self.assertRaises(ValueError) as exeception_context: + (self.runtime_options.activity(name="random"))(client_act) + self.assertEqual(exeception_context.exception.args[0], + f'Activity {client_act.__name__} already has an alternate name test') + + def test_register_wf_act_using_both_decorator_and_method_without_name(self): + client_wf = (self.runtime_options.workflow())(self.mock_client_wf) + + wanted_orchestrator = ["mock_client_wf"] + assert listOrchestrators == wanted_orchestrator + assert client_wf._dapr_alternate_name == "mock_client_wf" + with self.assertRaises(ValueError) as exeception_context: + self.runtime_options.register_workflow(self.mock_client_wf, name="test_wf") + wf_name = self.mock_client_wf.__name__ + self.assertEqual(exeception_context.exception.args[0], + f'Workflow {wf_name} already registered as mock_client_wf') + + client_act = (self.runtime_options.activity())(self.mock_client_activity) + wanted_activity = ["mock_client_activity"] + assert listActivities == wanted_activity + assert client_act._dapr_alternate_name == "mock_client_activity" + with self.assertRaises(ValueError) as exeception_context: + self.runtime_options.register_activity(self.mock_client_activity, name="test_act") + act_name = self.mock_client_activity.__name__ + self.assertEqual(exeception_context.exception.args[0], + f'Activity {act_name} already registered as mock_client_activity') - runtime_options.register_workflow(self.mock_client_wf) - wanted_orchestrator = [self.mock_client_wf.__name__] - assert listOrchestrators == wanted_orchestrator + def test_decorator_register_optinal_name(self): + client_wf = (self.runtime_options.workflow(name="test_wf"))(self.mock_client_wf) + wanted_orchestrator = ["test_wf"] + assert listOrchestrators == wanted_orchestrator + assert client_wf._dapr_alternate_name == "test_wf" - runtime_options.register_activity(self.mock_client_activity) - wanted_activity = [self.mock_client_activity.__name__] - assert listActivities == wanted_activity + client_act = (self.runtime_options.activity(name="test_act"))(self.mock_client_activity) + wanted_activity = ["test_act"] + assert listActivities == wanted_activity + assert client_act._dapr_alternate_name == "test_act" diff --git a/setup.cfg b/setup.cfg index 962e70189..cfe3ff888 100644 --- a/setup.cfg +++ b/setup.cfg @@ -53,8 +53,10 @@ dapr.serializers = py.typed [flake8] -exclude = +exclude = + .venv, venv, + .env, build, dist, .git, diff --git a/tox.ini b/tox.ini index acd833317..c6cbff430 100644 --- a/tox.ini +++ b/tox.ini @@ -51,6 +51,7 @@ commands = ./validate.sh distributed_lock ./validate.sh configuration ./validate.sh demo_workflow + ./validate.sh workflow ./validate.sh ../ commands_pre = pip3 install -e {toxinidir}/