-
Notifications
You must be signed in to change notification settings - Fork 0
/
sample_3.py
88 lines (61 loc) · 2.35 KB
/
sample_3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""Heterogeneous Result Kind
"""
import time
import random
from dataclasses import dataclass
from src.conduit import AsyncConduit
from src.result import JsonResult, LocalResultIO
from src.dag import Dag, node_registrator
from src.node import Node
@dataclass
class CustomResult(JsonResult):
stdout: str
stderr: str
@dataclass
class CustomResultB(JsonResult):
"""This custom result kind is used for demonstrating 'heterogeneous result kind'."""
stdout: str
# Define a callback for Source nodes
def my_callback(node: Node, dep_results: dict[str, CustomResult], message=None) -> CustomResult:
if message:
print(f"[node-{node.label}] Dependency Results - {dep_results} | Message: {message}")
else:
print(f"[node-{node.label}] Dependency Results - {dep_results}")
# Simulate long-running process
time.sleep(random.randint(1, 2))
return CustomResult(f"{node.label}-stdout", f"{node.label}-stderr")
# Create a Dag instance
dag = Dag()
# Create the Source Nodes
node_1 = Node("1", my_callback, CustomResult)
node_2 = Node("2", my_callback, CustomResult, message="Hello World")
def _cb_func(node, dep_results):
res = CustomResultB(f"{node.label}-stdout")
# print(f"[node-{node.label}] {dep_results}")
print(f"[node-{node.label}] {res}")
return res
# "Non-Source" nodes dependent on "Source" nodes must use a `Node` object instead of strings.
@node_registrator(dag, "3", depends_on=[node_1, node_2])
def cb_3(node, dep_results) -> CustomResultB:
return _cb_func(node, dep_results)
@node_registrator(dag, "4", depends_on=["3"])
def cb_4(node, dep_results) -> CustomResultB:
return _cb_func(node, dep_results)
@node_registrator(dag, "5", depends_on=["3"])
def cb_5(node, dep_results) -> CustomResultB:
return _cb_func(node, dep_results)
@node_registrator(dag, "6", depends_on=["5"])
def cb_6(node, dep_results) -> CustomResultB:
return _cb_func(node, dep_results)
@node_registrator(dag, "7", depends_on=["4", "6"])
def cb_7(node, dep_results) -> CustomResultB:
return _cb_func(node, dep_results)
@node_registrator(dag, "8", depends_on=["7"])
def cb_7(node, dep_results) -> CustomResultB:
return _cb_func(node, dep_results)
for src, dst in dag.arcs:
print(f"{src} --> {dst}")
print()
res_io = LocalResultIO()
async_conduit = AsyncConduit.create_with_clean_start(dag, res_io)
async_conduit.start()