Skip to content

CedricAnover/proof-of-concept-dag

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

25 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

proof-of-concept-dag

Proof-of-Concept of Concurrent DAG for Marsh (https://github.com/CedricAnover/marsh)

Example 1

import time
import random
from pathlib import Path
from dataclasses import dataclass

from conduit import AsyncConduit
from result import JsonResult, LocalResultIO
from dag import Dag
from node import Node


@dataclass
class CustomResult(JsonResult):
    stdout: str
    stderr: str


def my_callback(node: Node, dep_results: dict[str, CustomResult], message=None) -> CustomResult:
    if message:
        print(f"[node-{node.label}] Dependency Results - {dep_results} | Message: {message}")
    else:
        print(f"[node-{node.label}] Dependency Results - {dep_results}")
    # Simulate long-running process
    time.sleep(random.randint(1, 2))
    return CustomResult(f"{node.label}-stdout", f"{node.label}-stderr")


node_1 = Node("1", my_callback, CustomResult)
node_2 = Node("2", my_callback, CustomResult, message="Hello World")
node_3 = Node("3", my_callback, CustomResult)
node_4 = Node("4", my_callback, CustomResult)
node_5 = Node("5", my_callback, CustomResult)
node_6 = Node("6", my_callback, CustomResult, message="Some Message")
node_7 = Node("7", my_callback, CustomResult)
node_8 = Node("8", my_callback, CustomResult)

dag = Dag()
dag.add_arc(node_1, node_3)
dag.add_arc(node_2, node_3)
dag.add_arc(node_3, node_4)
dag.add_arc(node_3, node_5)
dag.add_arc(node_5, node_6)
dag.add_arc(node_4, node_7)
dag.add_arc(node_7, node_8)
dag.add_arc(node_6, node_7)

for src, dst in dag.arcs:
    print(f"{src} --> {dst}")
print()

res_io = LocalResultIO()
async_conduit = AsyncConduit.create_with_clean_start(dag, res_io)
async_conduit.start()

Output:

1 --> 3
2 --> 3
3 --> 4
3 --> 5
5 --> 6
4 --> 7
7 --> 8
6 --> 7

[node-1] Running.
[node-1] Dependency Results - {}
[node-2] Running.
[node-2] Dependency Results - {} | Message: Hello World
[node-1] Done.
[node-2] Done.
[node-3] Ready for execution.
[node-3] Running.
[node-3] Dependency Results - {'1': CustomResult(stdout='1-stdout', stderr='1-stderr'), '2': CustomResult(stdout='2-stdout', stderr='2-stderr')}
[node-3] Done.
[node-5] Ready for execution.
[node-4] Ready for execution.
[node-5] Running.
[node-4] Running.
[node-5] Dependency Results - {'3': CustomResult(stdout='3-stdout', stderr='3-stderr')}
[node-4] Dependency Results - {'3': CustomResult(stdout='3-stdout', stderr='3-stderr')}
[node-5] Done.
[node-4] Done.
[node-6] Ready for execution.
[node-6] Running.
[node-6] Dependency Results - {'5': CustomResult(stdout='5-stdout', stderr='5-stderr')} | Message: Some Message
[node-6] Done.
[node-7] Ready for execution.
[node-7] Running.
[node-7] Dependency Results - {'4': CustomResult(stdout='4-stdout', stderr='4-stderr'), '6': CustomResult(stdout='6-stdout', stderr='6-stderr')}
[node-7] Done.
[node-8] Ready for execution.
[node-8] Running.
[node-8] Dependency Results - {'7': CustomResult(stdout='7-stdout', stderr='7-stderr')}
[node-8] Done.

Example 2

Using node_registrator to automatically create node from callback and register to DAG with its dependencies.

import time
import random
from pathlib import Path
from dataclasses import dataclass

from conduit import AsyncConduit
from result import JsonResult, LocalResultIO
from dag import Dag, node_registrator
from node import Node


@dataclass
class CustomResult(JsonResult):
    stdout: str
    stderr: str


# Define a callback for Source nodes
def my_callback(node: Node, dep_results: dict[str, CustomResult], message=None) -> CustomResult:
    if message:
        print(f"[node-{node.label}] Dependency Results - {dep_results} | Message: {message}")
    else:
        print(f"[node-{node.label}] Dependency Results - {dep_results}")
    # Simulate long-running process
    time.sleep(random.randint(1, 2))
    return CustomResult(f"{node.label}-stdout", f"{node.label}-stderr")


# Create a Dag instance
dag = Dag()

# Create the Source Nodes
node_1 = Node("1", my_callback, CustomResult)
node_2 = Node("2", my_callback, CustomResult, message="Hello World")


def _cb_func(node, dep_results):
    res = CustomResult(f"{node.label}-stdout", f"{node.label}-stderr")
    print(f"[node-{node.label}] {dep_results}")
    return res


# "Non-Source" nodes dependent on "Source" nodes must use a `Node` object instead of strings.
@node_registrator(dag, "3", depends_on=[node_1, node_2])
def cb_3(node, dep_results) -> CustomResult:
    return _cb_func(node, dep_results)


@node_registrator(dag, "4", depends_on=["3"])
def cb_4(node, dep_results) -> CustomResult:
    return _cb_func(node, dep_results)


@node_registrator(dag, "5", depends_on=["3"])
def cb_5(node, dep_results) -> CustomResult:
    return _cb_func(node, dep_results)


@node_registrator(dag, "6", depends_on=["5"])
def cb_6(node, dep_results) -> CustomResult:
    return _cb_func(node, dep_results)


@node_registrator(dag, "7", depends_on=["4", "6"])
def cb_7(node, dep_results) -> CustomResult:
    return _cb_func(node, dep_results)


@node_registrator(dag, "8", depends_on=["7"])
def cb_7(node, dep_results) -> CustomResult:
    return _cb_func(node, dep_results)


for src, dst in dag.arcs:
    print(f"{src} --> {dst}")
print()


res_io = LocalResultIO()
async_conduit = AsyncConduit.create_with_clean_start(dag, res_io)
async_conduit.start()

Output:

1 --> 3
2 --> 3
3 --> 4
3 --> 5
5 --> 6
4 --> 7
6 --> 7
7 --> 8

[node-1] Running.
[node-1] Dependency Results - {}
[node-2] Running.
[node-2] Dependency Results - {} | Message: Hello World
[node-1] Done.
[node-2] Done.
[node-3] Ready for execution.
[node-3] Running.
[node-3] {'1': CustomResult(stdout='1-stdout', stderr='1-stderr'), '2': CustomResult(stdout='2-stdout', stderr='2-stderr')}
[node-3] Done.
[node-5] Ready for execution.
[node-4] Ready for execution.
[node-5] Running.
[node-4] Running.
[node-4] {'3': CustomResult(stdout='3-stdout', stderr='3-stderr')}
[node-5] {'3': CustomResult(stdout='3-stdout', stderr='3-stderr')}
[node-4] Done.
[node-5] Done.
[node-6] Ready for execution.
[node-6] Running.
[node-6] {'5': CustomResult(stdout='5-stdout', stderr='5-stderr')}
[node-6] Done.
[node-7] Ready for execution.
[node-7] Running.
[node-7] {'4': CustomResult(stdout='4-stdout', stderr='4-stderr'), '6': CustomResult(stdout='6-stdout', stderr='6-stderr')}
[node-7] Done.
[node-8] Ready for execution.
[node-8] Running.
[node-8] {'7': CustomResult(stdout='7-stdout', stderr='7-stderr')}
[node-8] Done.

Example 3: Heterogeneous Result

The callback requires a type hint for the kind of result it should return. It would raise TypeError if it is not provided in the callback function definition.

import time
import random
from dataclasses import dataclass

from conduit import AsyncConduit
from result import JsonResult, LocalResultIO
from dag import Dag, node_registrator
from node import Node


@dataclass
class CustomResult(JsonResult):
    stdout: str
    stderr: str


@dataclass
class CustomResultB(JsonResult):
    """This custom result kind is used for demonstrating 'heterogeneous result kind'."""
    stdout: str


# Define a callback for Source nodes
def my_callback(node: Node, dep_results: dict[str, CustomResult], message=None) -> CustomResult:
    if message:
        print(f"[node-{node.label}] Dependency Results - {dep_results} | Message: {message}")
    else:
        print(f"[node-{node.label}] Dependency Results - {dep_results}")
    # Simulate long-running process
    time.sleep(random.randint(1, 2))
    return CustomResult(f"{node.label}-stdout", f"{node.label}-stderr")


# Create a Dag instance
dag = Dag()

# Create the Source Nodes
node_1 = Node("1", my_callback, CustomResult)
node_2 = Node("2", my_callback, CustomResult, message="Hello World")


def _cb_func(node, dep_results):
    res = CustomResultB(f"{node.label}-stdout")
    # print(f"[node-{node.label}] {dep_results}")
    print(f"[node-{node.label}] {res}")
    return res


# "Non-Source" nodes dependent on "Source" nodes must use a `Node` object instead of strings.
@node_registrator(dag, "3", depends_on=[node_1, node_2])
def cb_3(node, dep_results) -> CustomResultB:
    return _cb_func(node, dep_results)


@node_registrator(dag, "4", depends_on=["3"])
def cb_4(node, dep_results) -> CustomResultB:
    return _cb_func(node, dep_results)


@node_registrator(dag, "5", depends_on=["3"])
def cb_5(node, dep_results) -> CustomResultB:
    return _cb_func(node, dep_results)


@node_registrator(dag, "6", depends_on=["5"])
def cb_6(node, dep_results) -> CustomResultB:
    return _cb_func(node, dep_results)


@node_registrator(dag, "7", depends_on=["4", "6"])
def cb_7(node, dep_results) -> CustomResultB:
    return _cb_func(node, dep_results)


@node_registrator(dag, "8", depends_on=["7"])
def cb_7(node, dep_results) -> CustomResultB:
    return _cb_func(node, dep_results)


for src, dst in dag.arcs:
    print(f"{src} --> {dst}")
print()

res_io = LocalResultIO()
async_conduit = AsyncConduit.create_with_clean_start(dag, res_io)
async_conduit.start()

Output:

1 --> 3
2 --> 3
3 --> 4
3 --> 5
5 --> 6
4 --> 7
6 --> 7
7 --> 8

[node-1] Running.
[node-1] Dependency Results - {}
[node-2] Running.
[node-2] Dependency Results - {} | Message: Hello World
[node-2] Done.
[node-1] Done.
[node-3] Ready for execution.
[node-3] Running.
[node-3] CustomResultB(stdout='3-stdout')
[node-3] Done.
[node-5] Ready for execution.
[node-4] Ready for execution.
[node-5] Running.
[node-4] Running.
[node-4] CustomResultB(stdout='4-stdout')
[node-5] CustomResultB(stdout='5-stdout')
[node-4] Done.
[node-5] Done.
[node-6] Ready for execution.
[node-6] Running.
[node-6] CustomResultB(stdout='6-stdout')
[node-6] Done.
[node-7] Ready for execution.
[node-7] Running.
[node-7] CustomResultB(stdout='7-stdout')
[node-7] Done.
[node-8] Ready for execution.
[node-8] Running.
[node-8] CustomResultB(stdout='8-stdout')
[node-8] Done.

About

Proof-of-Concept of Concurrent DAG for Marsh (https://github.com/CedricAnover/marsh)

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages