-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][compiled graphs] Meta-issue: Support collective communication ops #47983
Comments
aDAG currently does not support collective APIs. We would like to add support for collective APIs, starting from allreduce. This PR support allreduce by introducing a syntax sugar `ray.experimental.collective.allreduce.bind`. The `bind` accepts arguments `input_nodes`, `op`, and `transport`. It returns a list of `CollectiveOutputNode` as the allreduce results, with the same size of `input_nodes`. The allreduce results write to newly allocated tensors. In the `COMPUTE` operation of `CollectiveOutputNode`, the corresponding NCCL collective API is called. There are no required changes for the input and output channels of `CollectiveOutputNode`. Proposed new API: ```python import ray.experimental.collective as collective with InputNode() as inp: dag = [worker.return_tensor.bind(inp) for worker in workers] dag = collective.allreduce.bind(dag, ReduceOp.SUM) dag = MultiOutputNode(dag) ``` API Requirements: 1. Input nodes are unique. 2. Actor handles are unique. 3. Actor handles match the custom NCCL group if specified. 4. All tensors have the same shape. Requirements 1-3 are checked in the `_CollectiveOperation` constructor. Requirement 4 is checked by runtime timeout. The operation scheduling is also updated to consider the NCCL collective operation. When a NCCL collective node is selected, all the corresponding collective nodes in the collective group should be selected as well. Meta-issue: #47983 --------- Signed-off-by: Weixin Deng <[email protected]> Signed-off-by: Yuhan Ruan <[email protected]> Co-authored-by: Yuhan Ruan <[email protected]>
aDAG currently does not support collective APIs. We would like to add support for collective APIs, starting from allreduce. This PR support allreduce by introducing a syntax sugar `ray.experimental.collective.allreduce.bind`. The `bind` accepts arguments `input_nodes`, `op`, and `transport`. It returns a list of `CollectiveOutputNode` as the allreduce results, with the same size of `input_nodes`. The allreduce results write to newly allocated tensors. In the `COMPUTE` operation of `CollectiveOutputNode`, the corresponding NCCL collective API is called. There are no required changes for the input and output channels of `CollectiveOutputNode`. Proposed new API: ```python import ray.experimental.collective as collective with InputNode() as inp: dag = [worker.return_tensor.bind(inp) for worker in workers] dag = collective.allreduce.bind(dag, ReduceOp.SUM) dag = MultiOutputNode(dag) ``` API Requirements: 1. Input nodes are unique. 2. Actor handles are unique. 3. Actor handles match the custom NCCL group if specified. 4. All tensors have the same shape. Requirements 1-3 are checked in the `_CollectiveOperation` constructor. Requirement 4 is checked by runtime timeout. The operation scheduling is also updated to consider the NCCL collective operation. When a NCCL collective node is selected, all the corresponding collective nodes in the collective group should be selected as well. Meta-issue: ray-project#47983 --------- Signed-off-by: Weixin Deng <[email protected]> Signed-off-by: Yuhan Ruan <[email protected]> Co-authored-by: Yuhan Ruan <[email protected]>
aDAG currently does not support collective APIs. We would like to add support for collective APIs, starting from allreduce. This PR support allreduce by introducing a syntax sugar `ray.experimental.collective.allreduce.bind`. The `bind` accepts arguments `input_nodes`, `op`, and `transport`. It returns a list of `CollectiveOutputNode` as the allreduce results, with the same size of `input_nodes`. The allreduce results write to newly allocated tensors. In the `COMPUTE` operation of `CollectiveOutputNode`, the corresponding NCCL collective API is called. There are no required changes for the input and output channels of `CollectiveOutputNode`. Proposed new API: ```python import ray.experimental.collective as collective with InputNode() as inp: dag = [worker.return_tensor.bind(inp) for worker in workers] dag = collective.allreduce.bind(dag, ReduceOp.SUM) dag = MultiOutputNode(dag) ``` API Requirements: 1. Input nodes are unique. 2. Actor handles are unique. 3. Actor handles match the custom NCCL group if specified. 4. All tensors have the same shape. Requirements 1-3 are checked in the `_CollectiveOperation` constructor. Requirement 4 is checked by runtime timeout. The operation scheduling is also updated to consider the NCCL collective operation. When a NCCL collective node is selected, all the corresponding collective nodes in the collective group should be selected as well. Meta-issue: ray-project#47983 --------- Signed-off-by: Weixin Deng <[email protected]> Signed-off-by: Yuhan Ruan <[email protected]> Co-authored-by: Yuhan Ruan <[email protected]>
aDAG currently does not support collective APIs. We would like to add support for collective APIs, starting from allreduce. This PR support allreduce by introducing a syntax sugar `ray.experimental.collective.allreduce.bind`. The `bind` accepts arguments `input_nodes`, `op`, and `transport`. It returns a list of `CollectiveOutputNode` as the allreduce results, with the same size of `input_nodes`. The allreduce results write to newly allocated tensors. In the `COMPUTE` operation of `CollectiveOutputNode`, the corresponding NCCL collective API is called. There are no required changes for the input and output channels of `CollectiveOutputNode`. Proposed new API: ```python import ray.experimental.collective as collective with InputNode() as inp: dag = [worker.return_tensor.bind(inp) for worker in workers] dag = collective.allreduce.bind(dag, ReduceOp.SUM) dag = MultiOutputNode(dag) ``` API Requirements: 1. Input nodes are unique. 2. Actor handles are unique. 3. Actor handles match the custom NCCL group if specified. 4. All tensors have the same shape. Requirements 1-3 are checked in the `_CollectiveOperation` constructor. Requirement 4 is checked by runtime timeout. The operation scheduling is also updated to consider the NCCL collective operation. When a NCCL collective node is selected, all the corresponding collective nodes in the collective group should be selected as well. Meta-issue: ray-project#47983 --------- Signed-off-by: Weixin Deng <[email protected]> Signed-off-by: Yuhan Ruan <[email protected]> Co-authored-by: Yuhan Ruan <[email protected]> Signed-off-by: mohitjain2504 <[email protected]>
Hey @stephanie-wang, just wanted to make sure this meta-task is still on the roadmap before I start the implementation. I'm particularly interested in #47938, but I think it's better to start with supporting all-to-all, all-to-one, and one-to-all collectives. I took a look at the RFC, and here is my understanding:
Is this the design that we agreed upon? If this looks good to you, I'll create issues to track the support for all-to-all, all-to-one, and one-to-all patterns. |
Yes, everything here is still on the roadmap and open to contributions! |
Thanks Stephanie, here are the links to the issues for all-to-one and one-to-all. |
Description
This is a meta-issue to track progress for tasks related to collective communication. See RFC for more details.
Roadmap:
direct_return=False
, i.e. cases where user returns a mix of CPU and GPU dataUse case
No response
The text was updated successfully, but these errors were encountered: