-
Notifications
You must be signed in to change notification settings - Fork 0
/
sample_4.py
55 lines (44 loc) · 1.68 KB
/
sample_4.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""Example: ThreadPoolConduit"""
import time
import random
from dataclasses import dataclass
from src.conduit import ThreadPoolConduit
from src.result import JsonResult, LocalResultIO
from src.dag import Dag
from src.node import Node
@dataclass
class CustomResult(JsonResult):
stdout: str
stderr: str
def my_callback(node: Node, dep_results: dict[str, CustomResult], message=None) -> CustomResult:
if message:
print(f"[node-{node.label}] Dependency Results - {dep_results} | Message: {message}")
else:
print(f"[node-{node.label}] Dependency Results - {dep_results}")
# Simulate long-running process
time.sleep(random.randint(1, 2))
return CustomResult(f"{node.label}-stdout", f"{node.label}-stderr")
if __name__ == "__main__":
node_1 = Node("1", my_callback, CustomResult)
node_2 = Node("2", my_callback, CustomResult, message="Hello World")
node_3 = Node("3", my_callback, CustomResult)
node_4 = Node("4", my_callback, CustomResult)
node_5 = Node("5", my_callback, CustomResult)
node_6 = Node("6", my_callback, CustomResult, message="Some Message")
node_7 = Node("7", my_callback, CustomResult)
node_8 = Node("8", my_callback, CustomResult)
dag = Dag()
dag.add_arc(node_1, node_3)
dag.add_arc(node_2, node_3)
dag.add_arc(node_3, node_4)
dag.add_arc(node_3, node_5)
dag.add_arc(node_5, node_6)
dag.add_arc(node_4, node_7)
dag.add_arc(node_7, node_8)
dag.add_arc(node_6, node_7)
for src, dst in dag.arcs:
print(f"{src} --> {dst}")
print()
res_io = LocalResultIO()
async_conduit = ThreadPoolConduit.create_with_clean_start(dag, res_io, max_workers=10)
async_conduit.start()