Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial code for decorators and optional naming of workflows and activities #651

Merged
merged 6 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ cd python-sdk
pip3 install -e .
pip3 install -e ./ext/dapr-ext-grpc/
pip3 install -e ./ext/dapr-ext-fastapi/
pip3 install -e ./ext/dapr-ext-workflow/
```

3. Install required packages
Expand Down
39 changes: 39 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,20 @@ Each of the examples in this directory can be run directly from the command line
### Task Chaining

This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command:
<!--STEP
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved
name: Run the task chaining example
expected_stdout_lines:
- "== APP == Step 1: Received input: 42."
- "== APP == Step 2: Received input: 43."
- "== APP == Step 3: Received input: 86."
- "== APP == Workflow completed! Status: WorkflowStatus.COMPLETED"
timeout_seconds: 30
-->

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 task_chaining.py
```
<!--END_STEP-->

The output of this example should look like this:

Expand All @@ -41,9 +51,38 @@ The output of this example should look like this:

This example demonstrates how to fan-out a workflow into multiple parallel tasks, and then fan-in the results of those tasks. You can run this sample using the following command:

<!--STEP
name: Run the fan-out/fan-in example
match_order: none
expected_stdout_lines:
- "== APP == Processing work item: 1."
- "== APP == Processing work item: 2."
- "== APP == Processing work item: 3."
- "== APP == Processing work item: 4."
- "== APP == Processing work item: 5."
- "== APP == Processing work item: 6."
- "== APP == Processing work item: 7."
- "== APP == Processing work item: 8."
- "== APP == Processing work item: 9."
- "== APP == Processing work item: 10."
- "== APP == Work item 1 processed. Result: 2."
- "== APP == Work item 2 processed. Result: 4."
- "== APP == Work item 3 processed. Result: 6."
- "== APP == Work item 4 processed. Result: 8."
- "== APP == Work item 5 processed. Result: 10."
- "== APP == Work item 6 processed. Result: 12."
- "== APP == Work item 7 processed. Result: 14."
- "== APP == Work item 8 processed. Result: 16."
- "== APP == Work item 9 processed. Result: 18."
- "== APP == Work item 10 processed. Result: 20."
- "== APP == Final result: 110."
timeout_seconds: 30
-->

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 fan_out_fan_in.py
```
<!--END_STEP-->

The output of this sample should look like this:

Expand Down
23 changes: 12 additions & 11 deletions examples/workflow/child_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,33 @@
import dapr.ext.workflow as wf
import time

wfr = wf.WorkflowRuntime()

@wfr.workflow
def main_workflow(ctx: wf.DaprWorkflowContext):
try:
instance_id = ctx.instance_id
child_instance_id = instance_id + '-child'
print(f'*** Calling child workflow {child_instance_id}')
print(f'*** Calling child workflow {child_instance_id}', flush=True)
yield ctx.call_child_workflow(workflow=child_workflow,input=None,instance_id=child_instance_id)
except Exception as e:
print(f'*** Exception: {e}')

return

@wfr.workflow
def child_workflow(ctx: wf.DaprWorkflowContext):
instance_id = ctx.instance_id
print(f'*** Child workflow {instance_id} called')
print(f'*** Child workflow {instance_id} called', flush=True)

if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(main_workflow)
workflowRuntime.register_workflow(child_workflow)
workflowRuntime.start()
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
workflow=main_workflow)
instance_id = wf_client.schedule_new_workflow(workflow=main_workflow)

# Wait for the workflow to complete
time.sleep(5)
workflowRuntime.shutdown()

wfr.shutdown()
18 changes: 8 additions & 10 deletions examples/workflow/fan_out_fan_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from typing import List
import dapr.ext.workflow as wf

wfr = wf.WorkflowRuntime()

@wfr.workflow(name="batch_processing")
def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
# get a batch of N work items to process in parallel
work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
Expand All @@ -27,30 +29,26 @@ def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
total = sum(outputs)
yield ctx.call_activity(process_results, input=total)


@wfr.activity(name="get_batch")
def get_work_batch(ctx, batch_size: int) -> List[int]:
return [i + 1 for i in range(batch_size)]


@wfr.activity
def process_work_item(ctx, work_item: int) -> int:
print(f'Processing work item: {work_item}.')
time.sleep(5)
result = work_item * 2
print(f'Work item {work_item} processed. Result: {result}.')
return result


@wfr.activity(name="final_process")
def process_results(ctx, final_result: int):
print(f'Final result: {final_result}.')


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(batch_processing_workflow)
workflowRuntime.register_activity(get_work_batch)
workflowRuntime.register_activity(process_work_item)
workflowRuntime.register_activity(process_results)
workflowRuntime.start()
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
Expand All @@ -59,4 +57,4 @@ def process_results(ctx, final_result: int):
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)

workflowRuntime.shutdown()
wfr.shutdown()
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved
15 changes: 7 additions & 8 deletions examples/workflow/human_approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dapr.clients import DaprClient
import dapr.ext.workflow as wf

wfr = wf.WorkflowRuntime()

@dataclass
class Order:
Expand All @@ -37,7 +38,7 @@ class Approval:
def from_dict(dict):
return Approval(**dict)


@wfr.workflow(name="purchase_order_wf")
def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
# Orders under $1000 are auto-approved
if order.cost < 1000:
Expand All @@ -59,10 +60,12 @@ def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
return f"Approved by '{approval_details.approver}'"


@wfr.activity(name="send_approval")
def send_approval_request(_, order: Order) -> None:
print(f'*** Requesting approval from user for order: {order}')


@wfr.activity
def place_order(_, order: Order) -> None:
print(f'*** Placing order: {order}')

Expand All @@ -76,12 +79,8 @@ def place_order(_, order: Order) -> None:
parser.add_argument("--timeout", type=int, default=60, help="Timeout in seconds")
args = parser.parse_args()

# configure and start the workflow runtime
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(purchase_order_workflow)
workflowRuntime.register_activity(send_approval_request)
workflowRuntime.register_activity(place_order)
workflowRuntime.start()
# start the workflow runtime
wfr.start()

# Start a purchase order workflow using the user input
order = Order(args.cost, "MyProduct", 1)
Expand Down Expand Up @@ -119,4 +118,4 @@ def prompt_for_approval():
except TimeoutError:
print("*** Workflow timed out!")

workflowRuntime.shutdown()
wfr.shutdown()
15 changes: 8 additions & 7 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
from dataclasses import dataclass
from datetime import timedelta
import random
from time import sleep
import dapr.ext.workflow as wf


wfr = wf.WorkflowRuntime()
@dataclass
class JobStatus:
job_id: str
is_healthy: bool


@wfr.workflow(name="status_monitor")
def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
# poll a status endpoint associated with this job
status = yield ctx.call_activity(check_status, input=job)
Expand All @@ -43,20 +45,19 @@ def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
ctx.continue_as_new(job)


@wfr.activity
def check_status(ctx, _) -> str:
return random.choice(["healthy", "unhealthy"])


@wfr.activity
def send_alert(ctx, message: str):
print(f'*** Alert: {message}')


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime()
workflowRuntime.register_workflow(status_monitor_workflow)
workflowRuntime.register_activity(check_status)
workflowRuntime.register_activity(send_alert)
workflowRuntime.start()
wfr.start()
sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
job_id = "job1"
Expand All @@ -75,4 +76,4 @@ def send_alert(ctx, message: str):
print(f'Workflow already running. Instance ID: {job_id}')

input("Press Enter to stop...\n")
workflowRuntime.shutdown()
wfr.shutdown()
20 changes: 12 additions & 8 deletions examples/workflow/task_chaining.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from time import sleep

import dapr.ext.workflow as wf


wfr = wf.WorkflowRuntime()

@wfr.workflow(name="random_workflow")
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
Expand All @@ -24,37 +29,36 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
return [result1, result2, result3]


@wfr.activity(name="step10")
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1


@wfr.activity
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2


@wfr.activity
def step3(ctx, activity_input):
print(f'Step 3: Received input: {activity_input}.')
# Do some work
return activity_input ^ 2


@wfr.activity
def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Do some compensating work


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(task_chain_workflow)
workflowRuntime.register_activity(step1)
workflowRuntime.register_activity(step2)
workflowRuntime.register_activity(step3)
workflowRuntime.register_activity(error_handler)
workflowRuntime.start()
wfr.start()
sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
Expand All @@ -64,4 +68,4 @@ def error_handler(ctx, error):
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')

workflowRuntime.shutdown()
wfr.shutdown()
5 changes: 3 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""

# Import your main classes here
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
Expand All @@ -28,5 +28,6 @@
'WorkflowState',
'WorkflowStatus',
'when_all',
'when_any'
'when_any',
'alternate_name'
]
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@
Returns:
The ID of the scheduled workflow instance.
"""
if hasattr(workflow, '_dapr_alternate_name'):
return self.__obj.schedule_new_orchestration(workflow.__dict__['_dapr_alternate_name'],

Check warning on line 79 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py#L79

Added line #L79 was not covered by tests
input=input, instance_id=instance_id,
start_at=start_at)
return self.__obj.schedule_new_orchestration(workflow.__name__, input=input,
instance_id=instance_id, start_at=start_at)
instance_id=instance_id,
start_at=start_at)

def get_workflow_state(self, instance_id: str, *,
fetch_payloads: bool = True) -> Optional[WorkflowState]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@

def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *,
input: TInput = None) -> task.Task[TOutput]:
if hasattr(activity, '_dapr_alternate_name'):
return self.__obj.call_activity(activity=activity.__dict__['_dapr_alternate_name'],

Check warning on line 57 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py#L57

Added line #L57 was not covered by tests
input=input)
# this return should ideally never execute
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved
return self.__obj.call_activity(activity=activity.__name__, input=input)

def call_child_workflow(self, workflow: Workflow, *,
Expand All @@ -62,7 +66,12 @@
daprWfContext = DaprWorkflowContext(ctx)
return workflow(daprWfContext, inp)
# copy workflow name so durabletask.worker can find the orchestrator in its registry
wf.__name__ = workflow.__name__

if hasattr(workflow, '_dapr_alternate_name'):
wf.__name__ = workflow.__dict__['_dapr_alternate_name']

Check warning on line 71 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py#L71

Added line #L71 was not covered by tests
else:
# this case should ideally never happen
wf.__name__ = workflow.__name__
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)

def wait_for_external_event(self, name: str) -> task.Task:
Expand Down
Loading
Loading