Skip to content

Commit

Permalink
Initial code for decorators and optional naming of workflows and acti…
Browse files Browse the repository at this point in the history
…vities (dapr#651)

* initial code for decorators and optional naming of workflows and activities

Signed-off-by: Mukundan Sundararajan <[email protected]>

* add alternater_name decorator

Signed-off-by: Mukundan Sundararajan <[email protected]>

* fix linter

Signed-off-by: Mukundan Sundararajan <[email protected]>

* address review comments.

Signed-off-by: Mukundan Sundararajan <[email protected]>

* address review comments.

Signed-off-by: Mukundan Sundararajan <[email protected]>

---------

Signed-off-by: Mukundan Sundararajan <[email protected]>
  • Loading branch information
mukundansundar authored and litan1106 committed Jan 10, 2024
1 parent 204fe71 commit 26c5bfb
Show file tree
Hide file tree
Showing 14 changed files with 410 additions and 53 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ cd python-sdk
pip3 install -e .
pip3 install -e ./ext/dapr-ext-grpc/
pip3 install -e ./ext/dapr-ext-fastapi/
pip3 install -e ./ext/dapr-ext-workflow/
```

3. Install required packages
Expand Down
39 changes: 39 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,20 @@ Each of the examples in this directory can be run directly from the command line
### Task Chaining

This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command:
<!--STEP
name: Run the task chaining example
expected_stdout_lines:
- "== APP == Step 1: Received input: 42."
- "== APP == Step 2: Received input: 43."
- "== APP == Step 3: Received input: 86."
- "== APP == Workflow completed! Status: WorkflowStatus.COMPLETED"
timeout_seconds: 30
-->

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 task_chaining.py
```
<!--END_STEP-->

The output of this example should look like this:

Expand All @@ -41,9 +51,38 @@ The output of this example should look like this:

This example demonstrates how to fan-out a workflow into multiple parallel tasks, and then fan-in the results of those tasks. You can run this sample using the following command:

<!--STEP
name: Run the fan-out/fan-in example
match_order: none
expected_stdout_lines:
- "== APP == Processing work item: 1."
- "== APP == Processing work item: 2."
- "== APP == Processing work item: 3."
- "== APP == Processing work item: 4."
- "== APP == Processing work item: 5."
- "== APP == Processing work item: 6."
- "== APP == Processing work item: 7."
- "== APP == Processing work item: 8."
- "== APP == Processing work item: 9."
- "== APP == Processing work item: 10."
- "== APP == Work item 1 processed. Result: 2."
- "== APP == Work item 2 processed. Result: 4."
- "== APP == Work item 3 processed. Result: 6."
- "== APP == Work item 4 processed. Result: 8."
- "== APP == Work item 5 processed. Result: 10."
- "== APP == Work item 6 processed. Result: 12."
- "== APP == Work item 7 processed. Result: 14."
- "== APP == Work item 8 processed. Result: 16."
- "== APP == Work item 9 processed. Result: 18."
- "== APP == Work item 10 processed. Result: 20."
- "== APP == Final result: 110."
timeout_seconds: 30
-->

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 fan_out_fan_in.py
```
<!--END_STEP-->

The output of this sample should look like this:

Expand Down
16 changes: 9 additions & 7 deletions examples/workflow/child_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
import dapr.ext.workflow as wf
import time

wfr = wf.WorkflowRuntime()


@wfr.workflow
def main_workflow(ctx: wf.DaprWorkflowContext):
try:
instance_id = ctx.instance_id
child_instance_id = instance_id + '-child'
print(f'*** Calling child workflow {child_instance_id}')
print(f'*** Calling child workflow {child_instance_id}', flush=True)
yield ctx.call_child_workflow(
workflow=child_workflow, input=None, instance_id=child_instance_id
)
Expand All @@ -28,21 +31,20 @@ def main_workflow(ctx: wf.DaprWorkflowContext):
return


@wfr.workflow
def child_workflow(ctx: wf.DaprWorkflowContext):
instance_id = ctx.instance_id
print(f'*** Child workflow {instance_id} called')
print(f'*** Child workflow {instance_id} called', flush=True)


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime('localhost', '50001')
workflowRuntime.register_workflow(main_workflow)
workflowRuntime.register_workflow(child_workflow)
workflowRuntime.start()
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=main_workflow)

# Wait for the workflow to complete
time.sleep(5)

workflowRuntime.shutdown()
wfr.shutdown()
16 changes: 9 additions & 7 deletions examples/workflow/fan_out_fan_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
from typing import List
import dapr.ext.workflow as wf

wfr = wf.WorkflowRuntime()


@wfr.workflow(name='batch_processing')
def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
# get a batch of N work items to process in parallel
work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
Expand All @@ -30,10 +33,12 @@ def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
yield ctx.call_activity(process_results, input=total)


@wfr.activity(name='get_batch')
def get_work_batch(ctx, batch_size: int) -> List[int]:
return [i + 1 for i in range(batch_size)]


@wfr.activity
def process_work_item(ctx, work_item: int) -> int:
print(f'Processing work item: {work_item}.')
time.sleep(5)
Expand All @@ -42,21 +47,18 @@ def process_work_item(ctx, work_item: int) -> int:
return result


@wfr.activity(name='final_process')
def process_results(ctx, final_result: int):
print(f'Final result: {final_result}.')


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime('localhost', '50001')
workflowRuntime.register_workflow(batch_processing_workflow)
workflowRuntime.register_activity(get_work_batch)
workflowRuntime.register_activity(process_work_item)
workflowRuntime.register_activity(process_results)
workflowRuntime.start()
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=batch_processing_workflow, input=10)
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)

workflowRuntime.shutdown()
wfr.shutdown()
15 changes: 8 additions & 7 deletions examples/workflow/human_approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from dapr.clients import DaprClient
import dapr.ext.workflow as wf

wfr = wf.WorkflowRuntime()


@dataclass
class Order:
Expand All @@ -38,6 +40,7 @@ def from_dict(dict):
return Approval(**dict)


@wfr.workflow(name='purchase_order_wf')
def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
# Orders under $1000 are auto-approved
if order.cost < 1000:
Expand All @@ -59,10 +62,12 @@ def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
return f"Approved by '{approval_details.approver}'"


@wfr.activity(name='send_approval')
def send_approval_request(_, order: Order) -> None:
print(f'*** Requesting approval from user for order: {order}')


@wfr.activity
def place_order(_, order: Order) -> None:
print(f'*** Placing order: {order}')

Expand All @@ -76,12 +81,8 @@ def place_order(_, order: Order) -> None:
parser.add_argument('--timeout', type=int, default=60, help='Timeout in seconds')
args = parser.parse_args()

# configure and start the workflow runtime
workflowRuntime = wf.WorkflowRuntime('localhost', '50001')
workflowRuntime.register_workflow(purchase_order_workflow)
workflowRuntime.register_activity(send_approval_request)
workflowRuntime.register_activity(place_order)
workflowRuntime.start()
# start the workflow runtime
wfr.start()

# Start a purchase order workflow using the user input
order = Order(args.cost, 'MyProduct', 1)
Expand Down Expand Up @@ -118,4 +119,4 @@ def prompt_for_approval():
except TimeoutError:
print('*** Workflow timed out!')

workflowRuntime.shutdown()
wfr.shutdown()
15 changes: 9 additions & 6 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@
from dataclasses import dataclass
from datetime import timedelta
import random
from time import sleep
import dapr.ext.workflow as wf

wfr = wf.WorkflowRuntime()


@dataclass
class JobStatus:
job_id: str
is_healthy: bool


@wfr.workflow(name='status_monitor')
def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
# poll a status endpoint associated with this job
status = yield ctx.call_activity(check_status, input=job)
Expand All @@ -43,20 +47,19 @@ def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
ctx.continue_as_new(job)


@wfr.activity
def check_status(ctx, _) -> str:
return random.choice(['healthy', 'unhealthy'])


@wfr.activity
def send_alert(ctx, message: str):
print(f'*** Alert: {message}')


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime()
workflowRuntime.register_workflow(status_monitor_workflow)
workflowRuntime.register_activity(check_status)
workflowRuntime.register_activity(send_alert)
workflowRuntime.start()
wfr.start()
sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
job_id = 'job1'
Expand All @@ -76,4 +79,4 @@ def send_alert(ctx, message: str):
print(f'Workflow already running. Instance ID: {job_id}')

input('Press Enter to stop...\n')
workflowRuntime.shutdown()
wfr.shutdown()
21 changes: 13 additions & 8 deletions examples/workflow/task_chaining.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from time import sleep

import dapr.ext.workflow as wf


wfr = wf.WorkflowRuntime()


@wfr.workflow(name='random_workflow')
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
Expand All @@ -24,42 +30,41 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
return [result1, result2, result3]


@wfr.activity(name='step10')
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1


@wfr.activity
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2


@wfr.activity
def step3(ctx, activity_input):
print(f'Step 3: Received input: {activity_input}.')
# Do some work
return activity_input ^ 2


@wfr.activity
def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Do some compensating work


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime('localhost', '50001')
workflowRuntime.register_workflow(task_chain_workflow)
workflowRuntime.register_activity(step1)
workflowRuntime.register_activity(step2)
workflowRuntime.register_activity(step3)
workflowRuntime.register_activity(error_handler)
workflowRuntime.start()
wfr.start()
sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42)
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')

workflowRuntime.shutdown()
wfr.shutdown()
3 changes: 2 additions & 1 deletion ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""

# Import your main classes here
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
Expand All @@ -29,4 +29,5 @@
'WorkflowStatus',
'when_all',
'when_any',
'alternate_name',
]
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ def schedule_new_workflow(
Returns:
The ID of the scheduled workflow instance.
"""
if hasattr(workflow, '_dapr_alternate_name'):
return self.__obj.schedule_new_orchestration(
workflow.__dict__['_dapr_alternate_name'],
input=input,
instance_id=instance_id,
start_at=start_at,
)
return self.__obj.schedule_new_orchestration(
workflow.__name__, input=input, instance_id=instance_id, start_at=start_at
)
Expand Down
12 changes: 11 additions & 1 deletion ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def call_activity(
*,
input: TInput = None,
) -> task.Task[TOutput]:
if hasattr(activity, '_dapr_alternate_name'):
return self.__obj.call_activity(
activity=activity.__dict__['_dapr_alternate_name'], input=input
)
# this return should ideally never execute
return self.__obj.call_activity(activity=activity.__name__, input=input)

def call_child_workflow(
Expand All @@ -67,7 +72,12 @@ def wf(ctx: task.OrchestrationContext, inp: TInput):
return workflow(daprWfContext, inp)

# copy workflow name so durabletask.worker can find the orchestrator in its registry
wf.__name__ = workflow.__name__

if hasattr(workflow, '_dapr_alternate_name'):
wf.__name__ = workflow.__dict__['_dapr_alternate_name']
else:
# this case should ideally never happen
wf.__name__ = workflow.__name__
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)

def wait_for_external_event(self, name: str) -> task.Task:
Expand Down
Loading

0 comments on commit 26c5bfb

Please sign in to comment.