Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TLS support for workflows #632

Merged
merged 4 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions examples/demo_workflow/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@

def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
print(f'{wf_input}')
yield ctx.call_activity(hello_act, wf_input=1)
yield ctx.call_activity(hello_act, wf_input=10)
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.wait_for_external_event("event1")
yield ctx.call_activity(hello_act, wf_input=100)
yield ctx.call_activity(hello_act, wf_input=1000)
yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)


def hello_act(ctx: WorkflowActivityContext, wf_input):
Expand All @@ -47,9 +47,7 @@ def hello_act(ctx: WorkflowActivityContext, wf_input):

def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflow_runtime = WorkflowRuntime(host, port)
workflow_runtime = WorkflowRuntime()
workflow_runtime.register_workflow(hello_world_wf)
workflow_runtime.register_activity(hello_act)
workflow_runtime.start()
Expand Down Expand Up @@ -107,8 +105,8 @@ def main():
sleep(1)
get_response = d.get_workflow(instance_id=instance_id,
workflow_component=workflow_component)
print(
f"Get response from {workflow_name} after terminate call: {get_response.runtime_status}")
print(f"Get response from {workflow_name} "
f"after terminate call: {get_response.runtime_status}")

# Purge Test
d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
Expand Down
2 changes: 1 addition & 1 deletion examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def send_alert(ctx, message: str):


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime = wf.WorkflowRuntime()
workflowRuntime.register_workflow(status_monitor_workflow)
workflowRuntime.register_activity(check_status)
workflowRuntime.register_activity(send_alert)
Expand Down
35 changes: 19 additions & 16 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
from dapr.ext.workflow.workflow_state import WorkflowState
from dapr.ext.workflow.workflow_context import Workflow
from dapr.ext.workflow.util import getAddress

from dapr.clients import DaprInternalError
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
from dapr.conf import settings

from dapr.conf.helpers import GrpcEndpoint

T = TypeVar('T')
TInput = TypeVar('TInput')
Expand All @@ -40,16 +42,22 @@ class DaprWorkflowClient:
This client is intended to be used by workflow application, not by general purpose
application.
"""

def __init__(self, host: Optional[str] = None, port: Optional[str] = None):
address = getAddress(host, port)

try:
uri = GrpcEndpoint(address)
except ValueError as error:
raise DaprInternalError(f'{error}') from error

metadata = tuple()
if settings.DAPR_API_TOKEN:
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
self.__obj = client.TaskHubGrpcClient(host_address=address, metadata=metadata)
self.__obj = client.TaskHubGrpcClient(host_address=uri.endpoint, metadata=metadata,
secure_channel=uri.tls)

def schedule_new_workflow(self,
workflow: Workflow, *,
input: Optional[TInput] = None,
def schedule_new_workflow(self, workflow: Workflow, *, input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None) -> str:
"""Schedules a new workflow instance for execution.
Expand All @@ -67,9 +75,8 @@ def schedule_new_workflow(self,
Returns:
The ID of the scheduled workflow instance.
"""
return self.__obj.schedule_new_orchestration(workflow.__name__,
input=input, instance_id=instance_id,
start_at=start_at)
return self.__obj.schedule_new_orchestration(workflow.__name__, input=input,
instance_id=instance_id, start_at=start_at)

def get_workflow_state(self, instance_id: str, *,
fetch_payloads: bool = True) -> Optional[WorkflowState]:
Expand All @@ -88,8 +95,7 @@ def get_workflow_state(self, instance_id: str, *,
state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads)
return WorkflowState(state) if state else None

def wait_for_workflow_start(self, instance_id: str, *,
fetch_payloads: bool = False,
def wait_for_workflow_start(self, instance_id: str, *, fetch_payloads: bool = False,
timeout_in_seconds: int = 60) -> Optional[WorkflowState]:
"""Waits for a workflow to start running and returns a WorkflowState object that contains
metadata about the started workflow.
Expand All @@ -109,13 +115,11 @@ def wait_for_workflow_start(self, instance_id: str, *,
WorkflowState record that describes the workflow instance and its execution status.
If the specified workflow isn't found, the WorkflowState.Exists value will be false.
"""
state = self.__obj.wait_for_orchestration_start(instance_id,
fetch_payloads=fetch_payloads,
state = self.__obj.wait_for_orchestration_start(instance_id, fetch_payloads=fetch_payloads,
timeout=timeout_in_seconds)
return WorkflowState(state) if state else None

def wait_for_workflow_completion(self, instance_id: str, *,
fetch_payloads: bool = True,
def wait_for_workflow_completion(self, instance_id: str, *, fetch_payloads: bool = True,
timeout_in_seconds: int = 60) -> Optional[WorkflowState]:
"""Waits for a workflow to complete and returns a WorkflowState object that contains
metadata about the started instance.
Expand Down Expand Up @@ -172,8 +176,7 @@ def raise_workflow_event(self, instance_id: str, event_name: str, *,
"""
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)

def terminate_workflow(self, instance_id: str, *,
output: Optional[Any] = None):
def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
"""Terminates a running workflow instance and updates its runtime status to
WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in
the task hub. When the task hub worker processes this message, it will update the runtime
Expand Down
11 changes: 10 additions & 1 deletion ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext
from dapr.ext.workflow.util import getAddress

from dapr.clients import DaprInternalError
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
from dapr.conf import settings
from dapr.conf.helpers import GrpcEndpoint

T = TypeVar('T')
TInput = TypeVar('TInput')
Expand All @@ -39,7 +42,13 @@ def __init__(self, host: Optional[str] = None, port: Optional[str] = None):
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
address = getAddress(host, port)

self.__worker = worker.TaskHubGrpcWorker(host_address=address, metadata=metadata)
try:
uri = GrpcEndpoint(address)
except ValueError as error:
raise DaprInternalError(f'{error}') from error

self.__worker = worker.TaskHubGrpcWorker(host_address=uri.endpoint, metadata=metadata,
secure_channel=uri.tls)

def register_workflow(self, fn: Workflow):
def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
Expand Down
Loading