diff --git a/.gitignore b/.gitignore index 659b0e14..0a96e906 100644 --- a/.gitignore +++ b/.gitignore @@ -107,5 +107,5 @@ venv.bak/ # mypy .mypy_cache/ -# OSX specific files +# macOS specific files .DS_Store diff --git a/examples/demo_workflow/app.py b/examples/demo_workflow/app.py index 879e4f46..892f7b95 100644 --- a/examples/demo_workflow/app.py +++ b/examples/demo_workflow/app.py @@ -19,43 +19,48 @@ settings = Settings() counter = 0 -instanceId = "exampleInstanceID" -workflowComponent = "dapr" -workflowName = "hello_world_wf" -inputData = "Hi Counter!" -workflowOptions = dict() -workflowOptions["task_queue"] = "testQueue" -eventName = "event1" -eventData = "eventData" -nonExistentIDError = "no such instance exists" - -def hello_world_wf(ctx: DaprWorkflowContext, input): - print(f'{input}') - yield ctx.call_activity(hello_act, input=1) - yield ctx.call_activity(hello_act, input=10) +instance_id = "exampleInstanceID" +workflow_component = "dapr" +workflow_name = "hello_world_wf" +input_data = "Hi Counter!" +workflow_options = dict() +workflow_options["task_queue"] = "testQueue" +event_name = "event1" +event_data = "eventData" +non_existent_id_error = "no such instance exists" + + +def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + print(f'{wf_input}') + yield ctx.call_activity(hello_act, wf_input=1) + yield ctx.call_activity(hello_act, wf_input=10) yield ctx.wait_for_external_event("event1") - yield ctx.call_activity(hello_act, input=100) - yield ctx.call_activity(hello_act, input=1000) + yield ctx.call_activity(hello_act, wf_input=100) + yield ctx.call_activity(hello_act, wf_input=1000) -def hello_act(ctx: WorkflowActivityContext, input): + +def hello_act(ctx: WorkflowActivityContext, wf_input): global counter - counter += input + counter += wf_input print(f'New counter value is: {counter}!', flush=True) + def main(): with DaprClient() as d: host = settings.DAPR_RUNTIME_HOST port = settings.DAPR_GRPC_PORT - workflowRuntime = WorkflowRuntime(host, port) - workflowRuntime.register_workflow(hello_world_wf) - workflowRuntime.register_activity(hello_act) - workflowRuntime.start() + workflow_runtime = WorkflowRuntime(host, port) + workflow_runtime.register_workflow(hello_world_wf) + workflow_runtime.register_activity(hello_act) + workflow_runtime.start() sleep(2) print("==========Start Counter Increase as per Input:==========") - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) + start_resp = d.start_workflow(instance_id=instance_id, + workflow_component=workflow_component, + workflow_name=workflow_name, input=input_data, + workflow_options=workflow_options) print(f"start_resp {start_resp.instance_id}") # Sleep for a while to let the workflow run @@ -63,52 +68,58 @@ def main(): assert counter == 11 # Pause Test - d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}") + d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component) + get_response = d.get_workflow(instance_id=instance_id, + workflow_component=workflow_component) + print(f"Get response from {workflow_name} after pause call: {get_response.runtime_status}") # Resume Test - d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}") + d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component) + get_response = d.get_workflow(instance_id=instance_id, + workflow_component=workflow_component) + print(f"Get response from {workflow_name} after resume call: {get_response.runtime_status}") sleep(1) # Raise event - d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent, - event_name=eventName, event_data=eventData) + d.raise_workflow_event(instance_id=instance_id, workflow_component=workflow_component, + event_name=event_name, event_data=event_data) sleep(5) # Purge Test - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) + d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component) try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) + d.get_workflow(instance_id=instance_id, workflow_component=workflow_component) except DaprInternalError as err: - if nonExistentIDError in err._message: + if non_existent_id_error in err._message: print("Instance Successfully Purged") - - # Kick off another workflow for termination purposes + # Kick off another workflow for termination purposes # This will also test using the same instance ID on a new workflow after # the old instance was purged - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) + start_resp = d.start_workflow(instance_id=instance_id, + workflow_component=workflow_component, + workflow_name=workflow_name, input=input_data, + workflow_options=workflow_options) print(f"start_resp {start_resp.instance_id}") # Terminate Test - d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent) + d.terminate_workflow(instance_id=instance_id, workflow_component=workflow_component) sleep(1) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}") + get_response = d.get_workflow(instance_id=instance_id, + workflow_component=workflow_component) + print( + f"Get response from {workflow_name} after terminate call: {get_response.runtime_status}") # Purge Test - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) + d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component) try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) + d.get_workflow(instance_id=instance_id, workflow_component=workflow_component) except DaprInternalError as err: - if nonExistentIDError in err._message: + if non_existent_id_error in err._message: print("Instance Successfully Purged") - workflowRuntime.shutdown() + workflow_runtime.shutdown() + if __name__ == '__main__': main() diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/util.py b/ext/dapr-ext-workflow/dapr/ext/workflow/util.py index 327969d7..9a8cad83 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/util.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/util.py @@ -19,7 +19,12 @@ def getAddress(host: Optional[str] = None, port: Optional[str] = None) -> str: - host = host or settings.DAPR_RUNTIME_HOST - port = port or settings.DAPR_GRPC_PORT - address = f"{host}:{port}" + if not host and not port: + address = settings.DAPR_GRPC_ENDPOINT or (f"{settings.DAPR_RUNTIME_HOST}:" + f"{settings.DAPR_GRPC_PORT}") + else: + host = host or settings.DAPR_RUNTIME_HOST + port = port or settings.DAPR_GRPC_PORT + address = f"{host}:{port}" + return address diff --git a/ext/dapr-ext-workflow/tests/test_workflow_util.py b/ext/dapr-ext-workflow/tests/test_workflow_util.py new file mode 100644 index 00000000..2fcab61d --- /dev/null +++ b/ext/dapr-ext-workflow/tests/test_workflow_util.py @@ -0,0 +1,30 @@ +import unittest +from dapr.ext.workflow.util import getAddress +from unittest.mock import patch + +from dapr.conf import settings + + +class DaprWorkflowUtilTest(unittest.TestCase): + + def test_get_address_default(self): + expected = f"{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}" + self.assertEqual(expected, getAddress()) + + def test_get_address_with_constructor_arguments(self): + self.assertEqual("test.com:5000", getAddress("test.com", "5000")) + + def test_get_address_with_partial_constructor_arguments(self): + expected = f"{settings.DAPR_RUNTIME_HOST}:5000" + self.assertEqual(expected, getAddress(port="5000")) + + expected = f"test.com:{settings.DAPR_GRPC_PORT}" + self.assertEqual(expected, getAddress(host="test.com")) + + @patch.object(settings, "DAPR_GRPC_ENDPOINT", "https://domain1.com:5000") + def test_get_address_with_constructor_arguments_and_env_variable(self): + self.assertEqual("test.com:5000", getAddress("test.com", "5000")) + + @patch.object(settings, "DAPR_GRPC_ENDPOINT", "https://domain1.com:5000") + def test_get_address_with_env_variable(self): + self.assertEqual("https://domain1.com:5000", getAddress())