Skip to content

Commit

Permalink
add purge workflow function
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting committed Dec 3, 2024
1 parent 8399ac9 commit f01e223
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
14 changes: 12 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def raise_workflow_event(
"""
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)

def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True):
"""Terminates a running workflow instance and updates its runtime status to
WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in
the task hub. When the task hub worker processes this message, it will update the runtime
Expand All @@ -226,9 +226,10 @@ def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
Args:
instance_id: The ID of the workflow instance to terminate.
output: The optional output to set for the terminated workflow instance.
recursive: The optional flag to terminate all child workflows.
"""
return self.__obj.terminate_orchestration(instance_id, output=output)
return self.__obj.terminate_orchestration(instance_id, output=output, recursive=recursive)

def pause_workflow(self, instance_id: str):
"""Suspends a workflow instance, halting processing of it until resume_workflow is used to
Expand All @@ -246,3 +247,12 @@ def resume_workflow(self, instance_id: str):
instance_id: The instance ID of the workflow to resume.
"""
return self.__obj.resume_orchestration(instance_id)

def purge_workflow(self, instance_id: str, recursive: bool = True):
"""Purge data from a workflow instance.
Args:
instance_id: The instance ID of the workflow to purge.
recursive: The optional flag to also purge data from all child workflows.
"""
return self.__obj.purge_orchestration(instance_id, recursive)
7 changes: 7 additions & 0 deletions ext/dapr-ext-workflow/tests/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
mock_terminate_result = 'terminate001'
mock_suspend_result = 'suspend001'
mock_resume_result = 'resume001'
mock_purge_result = 'purge001'
mockInstanceId = 'instance001'


Expand Down Expand Up @@ -58,6 +59,9 @@ def suspend_orchestration(self, instance_id: str):
def resume_orchestration(self, instance_id: str):
return mock_resume_result

def purge_workflow(self, instance_id: str, recursive: bool = True):
return mock_purge_result

def _inner_get_orchestration_state(self, instance_id, state: client.OrchestrationStatus):
return client.OrchestrationState(
instance_id=instance_id,
Expand Down Expand Up @@ -119,3 +123,6 @@ def test_client_functions(self):

actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId)
assert actual_resume_result == mock_resume_result

actual_purge_result = wfClient.purge_workflow(instance_id=mockInstanceId)
assert actual_purge_result == mock_purge_result

0 comments on commit f01e223

Please sign in to comment.