Skip to content

Commit

Permalink
support reuse id policy
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting committed Dec 3, 2024
1 parent f01e223 commit 8b478a7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
1 change: 1 addition & 0 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def send_alert(ctx, message: str):
except Exception:
pass
if not status or status.runtime_status.name != 'RUNNING':
# TODO update to use reuse_id_policy
instance_id = wf_client.schedule_new_workflow(
workflow=status_monitor_workflow,
input=JobStatus(job_id=job_id, is_healthy=True),
Expand Down
10 changes: 8 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

from __future__ import annotations
from datetime import datetime
from typing import Any, Optional, TypeVar
from typing import Any, Optional, TypeVar, Union


from durabletask import client
import durabletask.internal.orchestrator_service_pb2 as pb

from dapr.ext.workflow.workflow_state import WorkflowState
from dapr.ext.workflow.workflow_context import Workflow
Expand Down Expand Up @@ -78,6 +80,7 @@ def schedule_new_workflow(
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
) -> str:
"""Schedules a new workflow instance for execution.
Expand All @@ -90,6 +93,8 @@ def schedule_new_workflow(
start_at: The time when the workflow instance should start executing.
If not specified or if a date-time in the past is specified, the workflow instance will
be scheduled immediately.
reuse_id_policy: Optional policy to reuse the workflow id when there is a conflict with
an existing workflow instance.
Returns:
The ID of the scheduled workflow instance.
Expand All @@ -100,9 +105,10 @@ def schedule_new_workflow(
input=input,
instance_id=instance_id,
start_at=start_at,
reuse_id_policy=Union(reuse_id_policy),
)
return self.__obj.schedule_new_orchestration(
workflow.__name__, input=input, instance_id=instance_id, start_at=start_at
workflow.__name__, input=input, instance_id=instance_id, start_at=start_at, reuse_id_policy=Union(reuse_id_policy),
)

def get_workflow_state(
Expand Down

0 comments on commit 8b478a7

Please sign in to comment.