Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Retry Policy wrapper for WF #658

Merged
merged 7 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions examples/demo_workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ expected_stdout_lines:
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: Suspended"
- "== APP == Get response from hello_world_wf after resume call: Running"
- "== APP == New counter value is: 111!"
Expand All @@ -56,6 +67,17 @@ You should be able to see the following output:
== APP == Hi Counter!
== APP == New counter value is: 1!
== APP == New counter value is: 11!
== APP == Retry count value is: 0!
== APP == Retry count value is: 1! This print statement verifies retry
== APP == Appending 1 to child_orchestrator_string!
== APP == Appending a to child_orchestrator_string!
== APP == Appending a to child_orchestrator_string!
== APP == Appending 2 to child_orchestrator_string!
== APP == Appending b to child_orchestrator_string!
== APP == Appending b to child_orchestrator_string!
== APP == Appending 3 to child_orchestrator_string!
== APP == Appending c to child_orchestrator_string!
== APP == Appending c to child_orchestrator_string!
== APP == Get response from hello_world_wf after pause call: Suspended
== APP == Get response from hello_world_wf after resume call: Running
== APP == New counter value is: 111!
Expand Down
55 changes: 53 additions & 2 deletions examples/demo_workflow/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import timedelta
from time import sleep
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext, RetryPolicy
from dapr.conf import Settings
from dapr.clients import DaprClient
from dapr.clients.exceptions import DaprInternalError

settings = Settings()

counter = 0
retry_count = 0
child_orchestrator_count = 0
child_orchestrator_string = ""
child_act_retry_count = 0
instance_id = "exampleInstanceID"
workflow_component = "dapr"
workflow_name = "hello_world_wf"
Expand All @@ -29,11 +34,20 @@
event_data = "eventData"
non_existent_id_error = "no such instance exists"

retry_policy=RetryPolicy(first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
backoff_coefficient=2,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(seconds=100)
)


def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
print(f'{wf_input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
yield ctx.call_child_workflow(workflow=child_wf, retry_policy=retry_policy)
yield ctx.wait_for_external_event("event1")
yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
Expand All @@ -45,11 +59,46 @@ def hello_act(ctx: WorkflowActivityContext, wf_input):
print(f'New counter value is: {counter}!', flush=True)


def hello_retryable_act(ctx: WorkflowActivityContext):
global retry_count
if (retry_count % 2) == 0:
print(f'Retry count value is: {retry_count}!', flush=True)
retry_count += 1
raise ValueError("Retryable Error")
print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
retry_count += 1


def child_wf(ctx: DaprWorkflowContext):
global child_orchestrator_string, child_orchestrator_count
if not ctx.is_replaying:
child_orchestrator_count += 1
print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
child_orchestrator_string += str(child_orchestrator_count)
yield ctx.call_activity(act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy)
if (child_orchestrator_count < 3):
raise ValueError("Retryable Error")


def act_for_child_wf(ctx: WorkflowActivityContext, inp):
global child_orchestrator_string, child_act_retry_count
inp_char = chr(96+inp)
print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
child_orchestrator_string += inp_char
if (child_act_retry_count %2 == 0):
child_act_retry_count += 1
raise ValueError("Retryable Error")
child_act_retry_count += 1


def main():
with DaprClient() as d:
workflow_runtime = WorkflowRuntime()
workflow_runtime.register_workflow(hello_world_wf)
workflow_runtime.register_workflow(child_wf)
workflow_runtime.register_activity(hello_act)
workflow_runtime.register_activity(hello_retryable_act)
workflow_runtime.register_activity(act_for_child_wf)
workflow_runtime.start()

sleep(2)
Expand All @@ -62,8 +111,10 @@ def main():
print(f"start_resp {start_resp.instance_id}")

# Sleep for a while to let the workflow run
sleep(1)
sleep(12)
assert counter == 11
assert retry_count == 2
assert child_orchestrator_string == "1aa2bb3cc"

# Pause Test
d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component)
Expand Down
4 changes: 3 additions & 1 deletion ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
from dapr.ext.workflow.workflow_state import WorkflowState, WorkflowStatus
from dapr.ext.workflow.retry_policy import RetryPolicy

__all__ = [
'WorkflowRuntime',
Expand All @@ -29,5 +30,6 @@
'WorkflowStatus',
'when_all',
'when_any',
'alternate_name'
'alternate_name',
'RetryPolicy'
]
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dapr.ext.workflow.workflow_context import WorkflowContext, Workflow
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
from dapr.ext.workflow.logger import LoggerOptions, Logger
from dapr.ext.workflow.retry_policy import RetryPolicy

T = TypeVar('T')
TInput = TypeVar('TInput')
Expand Down Expand Up @@ -58,17 +59,22 @@
return self.__obj.create_timer(fire_at)

def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *,
input: TInput = None) -> task.Task[TOutput]:
input: TInput = None,
retry_policy: Optional[RetryPolicy] = None) -> task.Task[TOutput]:
self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}')
if hasattr(activity, '_dapr_alternate_name'):
return self.__obj.call_activity(activity=activity.__dict__['_dapr_alternate_name'],
input=input)
# this return should ideally never execute
return self.__obj.call_activity(activity=activity.__name__, input=input)
act = activity.__dict__['_dapr_alternate_name']

Check warning on line 66 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py#L66

Added line #L66 was not covered by tests
else:
# this case should ideally never happen
act = activity.__name__
if retry_policy is None:
return self.__obj.call_activity(activity=act, input=input)
return self.__obj.call_activity(activity=act, input=input, retry_policy=retry_policy.obj)

Check warning on line 72 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py#L72

Added line #L72 was not covered by tests

def call_child_workflow(self, workflow: Workflow, *,
input: Optional[TInput],
instance_id: Optional[str]) -> task.Task[TOutput]:
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None) -> task.Task[TOutput]:
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}')

def wf(ctx: task.OrchestrationContext, inp: TInput):
Expand All @@ -81,7 +87,10 @@
else:
# this case should ideally never happen
wf.__name__ = workflow.__name__
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)
if retry_policy is None:
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id,

Check warning on line 92 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py#L92

Added line #L92 was not covered by tests
DeepanshuA marked this conversation as resolved.
Show resolved Hide resolved
retry_policy=retry_policy.obj)

def wait_for_external_event(self, name: str) -> task.Task:
self._logger.debug(f'{self.instance_id}: Waiting for external event {name}')
Expand Down
97 changes: 97 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import Optional, TypeVar
from datetime import timedelta

from durabletask import task

T = TypeVar('T')
TInput = TypeVar('TInput')
TOutput = TypeVar('TOutput')


class RetryPolicy:
"""Represents the retry policy for a workflow or activity function."""

def __init__(
self, *,
first_retry_interval: timedelta,
max_number_of_attempts: int,
backoff_coefficient: Optional[float] = 1.0,
max_retry_interval: Optional[timedelta] = None,
retry_timeout: Optional[timedelta] = None
):
"""Creates a new RetryPolicy instance.

Args:
first_retry_interval(timedelta): The retry interval to use for the first retry attempt.
max_number_of_attempts(int): The maximum number of retry attempts.
backoff_coefficient(Optional[float]): The backoff coefficient to use for calculating
the next retry interval.
max_retry_interval(Optional[timedelta]): The maximum retry interval to use for any
retry attempt.
retry_timeout(Optional[timedelta]): The maximum amount of time to spend retrying the
operation.
"""
# validate inputs
if first_retry_interval < timedelta(seconds=0):
raise ValueError('first_retry_interval must be >= 0')
if max_number_of_attempts < 1:
raise ValueError('max_number_of_attempts must be >= 1')
if backoff_coefficient is not None and backoff_coefficient < 1:
raise ValueError('backoff_coefficient must be >= 1')
if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0):
raise ValueError('max_retry_interval must be >= 0')
if retry_timeout is not None and retry_timeout < timedelta(seconds=0):
raise ValueError('retry_timeout must be >= 0')

Check warning on line 59 in ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py#L50-L59

Added lines #L50 - L59 were not covered by tests

self._obj = task.RetryPolicy(

Check warning on line 61 in ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py#L61

Added line #L61 was not covered by tests
first_retry_interval=first_retry_interval,
max_number_of_attempts=max_number_of_attempts,
backoff_coefficient=backoff_coefficient,
max_retry_interval=max_retry_interval,
retry_timeout=retry_timeout
)

@property
def obj(self) -> task.RetryPolicy:
"""Returns the underlying RetryPolicy object."""
return self._obj

Check warning on line 72 in ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py#L72

Added line #L72 was not covered by tests

@property
def first_retry_interval(self) -> timedelta:
"""The retry interval to use for the first retry attempt."""
return self._obj._first_retry_interval

Check warning on line 77 in ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py#L77

Added line #L77 was not covered by tests

@property
def max_number_of_attempts(self) -> int:
"""The maximum number of retry attempts."""
return self._obj._max_number_of_attempts

Check warning on line 82 in ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py#L82

Added line #L82 was not covered by tests

@property
def backoff_coefficient(self) -> Optional[float]:
"""The backoff coefficient to use for calculating the next retry interval."""
return self._obj._backoff_coefficient

Check warning on line 87 in ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py#L87

Added line #L87 was not covered by tests

@property
def max_retry_interval(self) -> Optional[timedelta]:
"""The maximum retry interval to use for any retry attempt."""
return self._obj._max_retry_interval

Check warning on line 92 in ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py#L92

Added line #L92 was not covered by tests

@property
def retry_timeout(self) -> Optional[timedelta]:
"""The maximum amount of time to spend retrying the operation."""
return self._obj._retry_timeout

Check warning on line 97 in ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py#L97

Added line #L97 was not covered by tests
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_workflow_context_functions(self):
assert call_activity_result == mock_call_activity

call_sub_orchestrator_result = dapr_wf_ctx.call_child_workflow(
self.mock_client_child_wf, input=None, instance_id=None)
self.mock_client_child_wf)
assert call_sub_orchestrator_result == mock_call_sub_orchestrator

create_timer_result = dapr_wf_ctx.create_timer(mock_date_time)
Expand Down
Loading