From f01e2230a3a8a8dc672d0d50550c81dea38a8198 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Tue, 3 Dec 2024 10:38:43 +0100 Subject: [PATCH] add purge workflow function Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- .../dapr/ext/workflow/dapr_workflow_client.py | 14 ++++++++++++-- .../tests/test_workflow_client.py | 7 +++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index 19f49981..50a03423 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -208,7 +208,7 @@ def raise_workflow_event( """ return self.__obj.raise_orchestration_event(instance_id, event_name, data=data) - def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None): + def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True): """Terminates a running workflow instance and updates its runtime status to WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in the task hub. When the task hub worker processes this message, it will update the runtime @@ -226,9 +226,10 @@ def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None): Args: instance_id: The ID of the workflow instance to terminate. output: The optional output to set for the terminated workflow instance. + recursive: The optional flag to terminate all child workflows. """ - return self.__obj.terminate_orchestration(instance_id, output=output) + return self.__obj.terminate_orchestration(instance_id, output=output, recursive=recursive) def pause_workflow(self, instance_id: str): """Suspends a workflow instance, halting processing of it until resume_workflow is used to @@ -246,3 +247,12 @@ def resume_workflow(self, instance_id: str): instance_id: The instance ID of the workflow to resume. """ return self.__obj.resume_orchestration(instance_id) + + def purge_workflow(self, instance_id: str, recursive: bool = True): + """Purge data from a workflow instance. + + Args: + instance_id: The instance ID of the workflow to purge. + recursive: The optional flag to also purge data from all child workflows. + """ + return self.__obj.purge_orchestration(instance_id, recursive) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index 4a7f93b9..f52531da 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -26,6 +26,7 @@ mock_terminate_result = 'terminate001' mock_suspend_result = 'suspend001' mock_resume_result = 'resume001' +mock_purge_result = 'purge001' mockInstanceId = 'instance001' @@ -58,6 +59,9 @@ def suspend_orchestration(self, instance_id: str): def resume_orchestration(self, instance_id: str): return mock_resume_result + def purge_workflow(self, instance_id: str, recursive: bool = True): + return mock_purge_result + def _inner_get_orchestration_state(self, instance_id, state: client.OrchestrationStatus): return client.OrchestrationState( instance_id=instance_id, @@ -119,3 +123,6 @@ def test_client_functions(self): actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId) assert actual_resume_result == mock_resume_result + + actual_purge_result = wfClient.purge_workflow(instance_id=mockInstanceId) + assert actual_purge_result == mock_purge_result