Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Refactor merge sort code example to use literalinclude #6091

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 3 additions & 57 deletions docs/user_guide/advanced_composition/dynamic_workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,63 +141,9 @@ resulting in less noticeable overhead.
Merge sort is a perfect example to showcase how to seamlessly achieve recursion using dynamic workflows.
Flyte imposes limitations on the depth of recursion to prevent misuse and potential impacts on the overall stability of the system.

```python
from typing import Tuple

from flytekit import conditional, dynamic, task, workflow


@task
def split(numbers: list[int]) -> Tuple[list[int], list[int], int, int]:
return (
numbers[0 : int(len(numbers) / 2)],
numbers[int(len(numbers) / 2) :],
int(len(numbers) / 2),
int(len(numbers)) - int(len(numbers) / 2),
)


@task
def merge(sorted_list1: list[int], sorted_list2: list[int]) -> list[int]:
result = []
while len(sorted_list1) > 0 and len(sorted_list2) > 0:
# Compare the current element of the first array with the current element of the second array.
# If the element in the first array is smaller, append it to the result and increment the first array index.
# Otherwise, do the same with the second array.
if sorted_list1[0] < sorted_list2[0]:
result.append(sorted_list1.pop(0))
else:
result.append(sorted_list2.pop(0))

# Extend the result with the remaining elements from both arrays
result.extend(sorted_list1)
result.extend(sorted_list2)

return result


@task
def sort_locally(numbers: list[int]) -> list[int]:
return sorted(numbers)


@dynamic
def merge_sort_remotely(numbers: list[int], run_local_at_count: int) -> list[int]:
split1, split2, new_count1, new_count2 = split(numbers=numbers)
sorted1 = merge_sort(numbers=split1, numbers_count=new_count1, run_local_at_count=run_local_at_count)
sorted2 = merge_sort(numbers=split2, numbers_count=new_count2, run_local_at_count=run_local_at_count)
return merge(sorted_list1=sorted1, sorted_list2=sorted2)


@workflow
def merge_sort(numbers: list[int], numbers_count: int, run_local_at_count: int = 5) -> list[int]:
return (
conditional("terminal_case")
.if_(numbers_count <= run_local_at_count)
.then(sort_locally(numbers=numbers))
.else_()
.then(merge_sort_remotely(numbers=numbers, run_local_at_count=run_local_at_count))
)
```{literalinclude} /examples/advanced_composition/advanced_composition/dynamic_workflow.py
:caption: advanced_composition/dynamic_workflow.py
:lines: 84-134
```

By simply adding the `@dynamic` annotation, the `merge_sort_remotely` function transforms into a plan of execution,
Expand Down
Loading