Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][compiled graphs] Meta-issue: Support collective communication ops #47983

Open
2 of 10 tasks
stephanie-wang opened this issue Oct 10, 2024 · 3 comments
Open
2 of 10 tasks
Labels
compiled-graphs enhancement Request for new feature and/or capability P1 Issue that should be fixed within a few weeks

Comments

@stephanie-wang
Copy link
Contributor

stephanie-wang commented Oct 10, 2024

Description

This is a meta-issue to track progress for tasks related to collective communication. See RFC for more details.

Roadmap:

Use case

No response

@stephanie-wang stephanie-wang added enhancement Request for new feature and/or capability triage Needs triage (eg: priority, bug/not-bug, and owning component) compiled-graphs P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Oct 10, 2024
stephanie-wang pushed a commit that referenced this issue Oct 21, 2024
aDAG currently does not support collective APIs. We would like to add
support for collective APIs, starting from allreduce.

This PR support allreduce by introducing a syntax sugar
`ray.experimental.collective.allreduce.bind`. The `bind` accepts
arguments `input_nodes`, `op`, and `transport`. It returns a list of
`CollectiveOutputNode` as the allreduce results, with the same size of
`input_nodes`. The allreduce results write to newly allocated tensors.
In the `COMPUTE` operation of `CollectiveOutputNode`, the corresponding
NCCL collective API is called. There are no required changes for the
input and output channels of `CollectiveOutputNode`.

Proposed new API:

```python
import ray.experimental.collective as collective

with InputNode() as inp:
    dag = [worker.return_tensor.bind(inp) for worker in workers]
    dag = collective.allreduce.bind(dag, ReduceOp.SUM)
    dag = MultiOutputNode(dag)
```

API Requirements:
1. Input nodes are unique.
2. Actor handles are unique.
3. Actor handles match the custom NCCL group if specified.
4. All tensors have the same shape.

Requirements 1-3 are checked in the `_CollectiveOperation` constructor.
Requirement 4 is checked by runtime timeout.

The operation scheduling is also updated to consider the NCCL collective
operation. When a NCCL collective node is selected, all the
corresponding collective nodes in the collective group should be
selected as well.

Meta-issue: #47983


---------

Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Co-authored-by: Yuhan Ruan <[email protected]>
Jay-ju pushed a commit to Jay-ju/ray that referenced this issue Nov 5, 2024
aDAG currently does not support collective APIs. We would like to add
support for collective APIs, starting from allreduce.

This PR support allreduce by introducing a syntax sugar
`ray.experimental.collective.allreduce.bind`. The `bind` accepts
arguments `input_nodes`, `op`, and `transport`. It returns a list of
`CollectiveOutputNode` as the allreduce results, with the same size of
`input_nodes`. The allreduce results write to newly allocated tensors.
In the `COMPUTE` operation of `CollectiveOutputNode`, the corresponding
NCCL collective API is called. There are no required changes for the
input and output channels of `CollectiveOutputNode`.

Proposed new API:

```python
import ray.experimental.collective as collective

with InputNode() as inp:
    dag = [worker.return_tensor.bind(inp) for worker in workers]
    dag = collective.allreduce.bind(dag, ReduceOp.SUM)
    dag = MultiOutputNode(dag)
```

API Requirements:
1. Input nodes are unique.
2. Actor handles are unique.
3. Actor handles match the custom NCCL group if specified.
4. All tensors have the same shape.

Requirements 1-3 are checked in the `_CollectiveOperation` constructor.
Requirement 4 is checked by runtime timeout.

The operation scheduling is also updated to consider the NCCL collective
operation. When a NCCL collective node is selected, all the
corresponding collective nodes in the collective group should be
selected as well.

Meta-issue: ray-project#47983


---------

Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Co-authored-by: Yuhan Ruan <[email protected]>
JP-sDEV pushed a commit to JP-sDEV/ray that referenced this issue Nov 14, 2024
aDAG currently does not support collective APIs. We would like to add
support for collective APIs, starting from allreduce.

This PR support allreduce by introducing a syntax sugar
`ray.experimental.collective.allreduce.bind`. The `bind` accepts
arguments `input_nodes`, `op`, and `transport`. It returns a list of
`CollectiveOutputNode` as the allreduce results, with the same size of
`input_nodes`. The allreduce results write to newly allocated tensors.
In the `COMPUTE` operation of `CollectiveOutputNode`, the corresponding
NCCL collective API is called. There are no required changes for the
input and output channels of `CollectiveOutputNode`.

Proposed new API:

```python
import ray.experimental.collective as collective

with InputNode() as inp:
    dag = [worker.return_tensor.bind(inp) for worker in workers]
    dag = collective.allreduce.bind(dag, ReduceOp.SUM)
    dag = MultiOutputNode(dag)
```

API Requirements:
1. Input nodes are unique.
2. Actor handles are unique.
3. Actor handles match the custom NCCL group if specified.
4. All tensors have the same shape.

Requirements 1-3 are checked in the `_CollectiveOperation` constructor.
Requirement 4 is checked by runtime timeout.

The operation scheduling is also updated to consider the NCCL collective
operation. When a NCCL collective node is selected, all the
corresponding collective nodes in the collective group should be
selected as well.

Meta-issue: ray-project#47983


---------

Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Co-authored-by: Yuhan Ruan <[email protected]>
mohitjain2504 pushed a commit to mohitjain2504/ray that referenced this issue Nov 15, 2024
aDAG currently does not support collective APIs. We would like to add
support for collective APIs, starting from allreduce.

This PR support allreduce by introducing a syntax sugar
`ray.experimental.collective.allreduce.bind`. The `bind` accepts
arguments `input_nodes`, `op`, and `transport`. It returns a list of
`CollectiveOutputNode` as the allreduce results, with the same size of
`input_nodes`. The allreduce results write to newly allocated tensors.
In the `COMPUTE` operation of `CollectiveOutputNode`, the corresponding
NCCL collective API is called. There are no required changes for the
input and output channels of `CollectiveOutputNode`.

Proposed new API:

```python
import ray.experimental.collective as collective

with InputNode() as inp:
    dag = [worker.return_tensor.bind(inp) for worker in workers]
    dag = collective.allreduce.bind(dag, ReduceOp.SUM)
    dag = MultiOutputNode(dag)
```

API Requirements:
1. Input nodes are unique.
2. Actor handles are unique.
3. Actor handles match the custom NCCL group if specified.
4. All tensors have the same shape.

Requirements 1-3 are checked in the `_CollectiveOperation` constructor.
Requirement 4 is checked by runtime timeout.

The operation scheduling is also updated to consider the NCCL collective
operation. When a NCCL collective node is selected, all the
corresponding collective nodes in the collective group should be
selected as well.

Meta-issue: ray-project#47983

---------

Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Co-authored-by: Yuhan Ruan <[email protected]>
Signed-off-by: mohitjain2504 <[email protected]>
@jeffreyjeffreywang
Copy link
Contributor

Hey @stephanie-wang, just wanted to make sure this meta-task is still on the roadmap before I start the implementation. I'm particularly interested in #47938, but I think it's better to start with supporting all-to-all, all-to-one, and one-to-all collectives. I took a look at the RFC, and here is my understanding:

  • All-to-one: Pass the reader worker handle to the collective call
workers = [Worker.options(num_gpus=1).remote() for _ in range(3)]
nccl_group_handle = ray.collective.NcclGroup(workers)
with InputNode() as inp:
  results = [worker.fwd.bind(inp) for worker in workers]
  # Pass the worker handle to the collective call.
  dag = ray.collective.gather.bind(
    results, workers[0],
    transport=nccl_group_handle)
  dag = workers[0].sync.bind(dag)

# Errors if `gather` reader is not part of the group.
dag = dag.experimental_compile()
  • One-to-all: Pass the sender worker handle to the collective call
workers = [Worker.options(num_gpus=1).remote() for _ in range(3)]
nccl_group_handle = ray.collective.NcclGroup(workers)
# One-to-all pattern.
with InputNode() as inp:
  result = workers[0].fwd.bind(inp)
  results = ray.collective.broadcast.bind(
    result, workers,
    transport=nccl_group_handle)
  dag = MultiOutputNode(results)

# Errors if `broadcast` sender is not part of the group.
dag = dag.experimental_compile()

Is this the design that we agreed upon? If this looks good to you, I'll create issues to track the support for all-to-all, all-to-one, and one-to-all patterns.

@stephanie-wang
Copy link
Contributor Author

Hey @stephanie-wang, just wanted to make sure this meta-task is still on the roadmap before I start the implementation. I'm particularly interested in #47938, but I think it's better to start with supporting all-to-all, all-to-one, and one-to-all collectives. I took a look at the RFC, and here is my understanding:

* All-to-one: Pass the reader worker handle to the collective call
workers = [Worker.options(num_gpus=1).remote() for _ in range(3)]
nccl_group_handle = ray.collective.NcclGroup(workers)
with InputNode() as inp:
  results = [worker.fwd.bind(inp) for worker in workers]
  # Pass the worker handle to the collective call.
  dag = ray.collective.gather.bind(
    results, workers[0],
    transport=nccl_group_handle)
  dag = workers[0].sync.bind(dag)

# Errors if `gather` reader is not part of the group.
dag = dag.experimental_compile()
* One-to-all: Pass the sender worker handle to the collective call
workers = [Worker.options(num_gpus=1).remote() for _ in range(3)]
nccl_group_handle = ray.collective.NcclGroup(workers)
# One-to-all pattern.
with InputNode() as inp:
  result = workers[0].fwd.bind(inp)
  results = ray.collective.broadcast.bind(
    result, workers,
    transport=nccl_group_handle)
  dag = MultiOutputNode(results)

# Errors if `broadcast` sender is not part of the group.
dag = dag.experimental_compile()

Is this the design that we agreed upon? If this looks good to you, I'll create issues to track the support for all-to-all, all-to-one, and one-to-all patterns.

Yes, everything here is still on the roadmap and open to contributions!

@jeffreyjeffreywang
Copy link
Contributor

jeffreyjeffreywang commented Dec 18, 2024

Thanks Stephanie, here are the links to the issues for all-to-one and one-to-all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
compiled-graphs enhancement Request for new feature and/or capability P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

No branches or pull requests

2 participants