Skip to content

Commit

Permalink
Revising sid WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
chhwang committed Oct 12, 2023
1 parent 6cb4b6e commit 0facc33
Show file tree
Hide file tree
Showing 17 changed files with 186 additions and 121 deletions.
86 changes: 55 additions & 31 deletions ark/include/ark.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ class Tensor {
public:
/// Tensor constructor.
Tensor(const Dims &shape, const TensorType &type, TensorBuf *buf,
const Dims &ldims, const Dims &offs, const Dims &pads, bool exported,
int imported_rank, int id, const std::string &name);
const Dims &ldims, const Dims &offs, const Dims &pads,
int exported_sid, int imported_rank, int id,
const std::string &name);
Tensor(const Tensor &) = default;

/// Copy contiguous data from a host buffer to the given tensor's (possibly
Expand Down Expand Up @@ -265,8 +266,8 @@ class Tensor {
/// Unit dimensions of the underlying data array. ldims[x] should be always
/// divided by pads[x].
Dims pads;
/// Whether this tensor is local and accessed by remote devices.
bool exported;
/// Unique ID to make a local tensor accessible by remote devices.
int exported_sid;
/// If imported_rank is non-negative, the tensor is imported from another
/// GPU and don't need to allocate a TensorBuf for it.
int imported_rank;
Expand Down Expand Up @@ -310,9 +311,9 @@ class Model {
/// sets the last dimension of @p pads to 3, then the corresponding ldim
/// will be the minimum multiple of 2x3=6 that is larger than or equal to
/// the corresponding dimension of @p offs + @p shape.
/// @param exported Whether the tensor is exported to other processes. This
/// should be set to true if the tensor is used as an input or output of a
/// remote process.
/// @param exported_sid Unique ID to export a local tensor to other
/// processes. This should be set to a unique non-negative value if the
/// tensor is used as an input or output of a remote process.
/// @param imported_rank The rank of the process that exports the tensor.
/// If @p imported_rank is set to a non-negative value, the tensor will be
/// considered as a remote tensor, hence no memory will be allocated for it
Expand All @@ -325,7 +326,7 @@ class Model {
TensorBuf *buf = nullptr, const Dims &ldims = {},
const Dims &offs = {}, const Dims &pads = {},
const std::vector<Tensor *> &deps = {},
bool exported = false, int imported_rank = -1,
int exported_sid = -1, int imported_rank = -1,
const std::string &name = "tensor");

Tensor *reshape(Tensor *input, const Dims &shape, bool allowzero = false,
Expand Down Expand Up @@ -445,38 +446,61 @@ class Model {
// tensor and the `other` tensor,
Tensor *div(Tensor *input, Tensor *other, Tensor *output = nullptr,
const std::string &name = "div");
/// Sends a tensor to a destination GPU (@p dst_rank). Multiple tensors can
/// be sent to the same GPU,so an identifier `id` is required to distinguish
/// the tensor. Each 'send' operator must have a corresponding 'recv'
/// operator that have the same id in another GPU's model.
/// Sends a tensor to a destination GPU (@p `dst_rank`). Multiple `send()`s
/// can be destined to the same rank, so an identifier @p `sid` is required
/// to distinguish different `send()`s. Each `send()` must have a
/// corresponding @ref `recv()` with the same @p `sid` in @p `dst_rank`'s
/// model. Note that the send is asynchronous, so the tensor is not
/// guaranteed to be sent until the corresponding @ref `send_done()` is
/// executed.
///
/// @param input
/// @param id
/// @param input The tensor to send.
/// @param sid Identifier of the send.
/// @param dst_rank Rank of the GPU to send to.
/// @param bytes
/// @param name
/// @return
Tensor *send(Tensor *input, int id, int dst_rank, std::size_t bytes = 0,
/// @param bytes Number of bytes to send. If 0, the entire tensor will be
/// sent.
/// @param name Name of the operator.
/// @return Identical to @p `input` with execution dependencies.
Tensor *send(Tensor *input, int sid, int dst_rank, std::size_t bytes = 0,
const std::string &name = "send");
// Blocks the execution until the corresponding 'send' operator with the
// specified `id` is completed.
Tensor *send_done(Tensor *input, int id, int dst_rank,

/// Blocks the execution until the corresponding @ref `send()` with
/// the specified @p `sid` is completed.
///
/// @param input The tensor to send. This should be the same tensor as the
/// output of the corresponding @ref `send()`.
/// @param sid Identifier of the send.
/// @param name Name of the operator.
/// @return Identical to @p `input` with execution dependencies.
Tensor *send_done(Tensor *input, int sid,
const std::string &name = "send_done");
// Receives a tensor from a source GPU (@p src_rank), identified by the `id`
// parameter. Blocks the execution until the corresponding 'recv' operator
// is completed.
Tensor *recv(int id, int src_rank, std::size_t bytes = 0,

/// Receives a tensor from a source GPU (@p `src_rank`), identified by @p
/// `sid`. Blocks the execution until the data is fully received.
///
/// @param sid Identifier of the corresponding @ref `send()`.
/// @param src_rank Rank of the GPU to receive from.
/// @param bytes Number of bytes to receive. Currently unused.
/// @param output The tensor to receive to. If nullptr, a new tensor will
/// be allocated.
/// @param name Name of the operator.
/// @return The received tensor.
Tensor *recv(int sid, int src_rank, std::size_t bytes = 0,
Tensor *output = nullptr, const std::string &name = "recv");
// Similar to the 'send_done' function, but implemented using CUDA in-stream
// RDMA copy and Low Latency (LL) protocol.
Tensor *send_mm(Tensor *input, int id, int gpu_dst, std::size_t bytes = 0,

/// Blocking send a tensor to a destination GPU (@p `dst_rank`). Unlike
/// @ref `send()` and @ref `send_done()`, this operator sends data through
/// a memory map. Therefore, @p `dst_rank` should refer to a GPU that is
/// peer-accessible to the current GPU.
Tensor *send_mm(Tensor *input, int sid, int dst_rank, std::size_t bytes = 0,
Tensor *output = nullptr,
const std::string &name = "send_mm");
// Similar to the 'recv' function, but implemented using CUDA in-stream RDMA
// copy and Low Latency (LL) protocol.
Tensor *recv_mm(Tensor *input, int id, int gpu_src, std::size_t bytes = 0,

///
Tensor *recv_mm(Tensor *input, int sid, int src_rank, std::size_t bytes = 0,
Tensor *output = nullptr,
const std::string &name = "recv_mm");

// Performs an all-reduce operator across all GPUs, aggregating the input
// tensors. Takes the `input` tensor, the current GPU's `gpu_id`, and the
// total number of GPUs `gpu_num`.
Expand Down
31 changes: 29 additions & 2 deletions ark/model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ std::vector<Tensor *> Model::Impl::add_op(Op &op) {
// to be the current Op.
this->tns_storage.emplace_back(make_unique<Tensor>(
tns->shape, tns->type, tns->buf, tns->ldims, tns->offs, tns->pads,
tns->exported, tns->imported_rank, (int)this->tns_storage.size(),
tns->name));
tns->exported_sid, tns->imported_rank,
(int)this->tns_storage.size(), tns->name));
output_tensor = this->tns_storage.back().get();

this->tns_to_producer[output_tensor] = op_ptr;
Expand Down Expand Up @@ -224,6 +224,33 @@ bool Model::Impl::is_no_user(Tensor *tns) const {
return false;
}

bool Model::Impl::is_sid_alloced(int sid) const {
auto search = this->sid_to_info.find(sid);
return search != this->sid_to_info.end();
}

const Model::Impl::SidInfo &Model::Impl::get_sid(int sid) {
if (!this->is_sid_alloced(sid)) {
LOG(ERROR, "sid ", sid, " is not allocated");
}
return this->sid_to_info[sid];
}

const Model::Impl::SidInfo &Model::Impl::get_or_alloc_sid(int sid,
int peer_rank) {
if (this->is_sid_alloced(sid)) {
const auto &info = this->sid_to_info[sid];
if (info.peer_rank != peer_rank) {
LOG(ERROR, "sid ", sid, " is allocated for peer rank ",
info.peer_rank, " but requested for peer rank ", peer_rank);
}
return info;
}
int sid = (this->next_sid)++;
this->tag_to_info[tag] = {tag, sid, peer_rank};
return this->tag_to_info[tag];
}

std::list<const Op *> Model::Impl::get_leaf_ops() const {
std::list<const Op *> leaf_ops;
for (auto &op : this->ops_storage) {
Expand Down
27 changes: 25 additions & 2 deletions ark/model.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,27 @@ class Model::Impl {
/// @param tns the @ref Tensor to query.
bool is_no_user(Tensor *tns) const;

struct SidInfo {
int sid;
int peer_rank;
};

/// Check if the sid is allocated.
/// @param sid the sid to query.
/// @return true if the sid is allocated, otherwise false.
bool is_sid_alloced(int sid) const;

/// Get a SidInfo for a sid.
/// @param sid the sid to query.
/// @return SidInfo.
const SidInfo &get_sid(int sid);

/// Get or allocate a SidInfo for a sid.
/// @param sid the sid to query.
/// @param peer_rank the peer rank of the sid.
/// @return SidInfo.
const SidInfo &get_or_alloc_sid(int sid, int peer_rank);

/// Model graph analysis

/// Get a list of all operators that have no user.
Expand All @@ -120,8 +141,6 @@ class Model::Impl {
protected:
/// Rank of this model.
int rank;
/// Number of assigned EIDs.
int next_eid = 0;

friend class Model;

Expand All @@ -143,6 +162,10 @@ class Model::Impl {
std::map<Tensor *, std::set<Op *>> tns_to_users;
/// Count the number of tensors requested the same name.
std::map<std::string, int> name_cnts;
/// Map between SIDs to corresponding SidInfo.
std::map<int, SidInfo> sid_to_info;
/// Number of allocated SIDs.
int next_sid = 0;
};

} // namespace ark
Expand Down
4 changes: 2 additions & 2 deletions ark/ops/ops_all_gather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ std::vector<Tensor *> Model::all_gather(Tensor *input, int gpu_id, int gpu_num,

std::vector<Tensor *> result(gpu_num);

int base = this->impl->next_eid;
int base = this->get_next_sid();
Tensor *prev_recv = nullptr;
for (int i = 1; i < gpu_num; i++) {
int gpu_dst = (gpu_id + i) % gpu_num;
Expand Down Expand Up @@ -55,7 +55,7 @@ std::vector<Tensor *> Model::all_gather(Tensor *input, int gpu_id, int gpu_num,
result[gpu_src] = recv;
}
result[gpu_id] = this->identity(input, {prev_recv});
this->impl->next_eid += gpu_num;
this->inc_next_sid(gpu_num);
return result;
}

Expand Down
4 changes: 2 additions & 2 deletions ark/ops/ops_all_reduce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Tensor *Model::all_reduce(Tensor *input, int gpu_id, int gpu_num,
LOG(ERROR, "all_reduce of a split tensor is not supported");
}

int base = this->impl->next_eid;
int base = this->get_next_sid();
Tensor *prev_recv = nullptr;
Tensor *cumulate = input;
for (int i = 1; i < gpu_num; i++) {
Expand All @@ -49,7 +49,7 @@ Tensor *Model::all_reduce(Tensor *input, int gpu_id, int gpu_num,
prev_recv = recv;
cumulate = this->add(cumulate, recv);
}
this->impl->next_eid += gpu_num;
this->inc_next_sid(gpu_num);
return cumulate;
}

Expand Down
10 changes: 6 additions & 4 deletions ark/ops/ops_cast.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ Tensor *Model::cast(Tensor *input, const TensorType &ttype, Tensor *output,
out_pads[last_dim] /= ttype.bytes();
}
return this->tensor(out_shape, ttype, input->buf, out_ldims,
out_offs, out_pads, {input}, input->exported,
input->imported_rank, name + "/cast");
out_offs, out_pads, {input},
input->exported_sid, input->imported_rank,
name + "/cast");
}
if (ttype == BYTE) {
// Casting other types to BYTE is considered as a reshape.
Expand All @@ -128,8 +129,9 @@ Tensor *Model::cast(Tensor *input, const TensorType &ttype, Tensor *output,
out_pads[-1] *= input->type.bytes();
}
return this->tensor(out_shape, ttype, input->buf, out_ldims,
out_offs, out_pads, {input}, input->exported,
input->imported_rank, name + "/cast");
out_offs, out_pads, {input},
input->exported_sid, input->imported_rank,
name + "/cast");
}
output = this->tensor(input->shape, ttype);
} else {
Expand Down
6 changes: 3 additions & 3 deletions ark/ops/ops_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,15 +458,15 @@ class RecvMMOp : public Op {

class SendOp : public Op {
public:
SendOp(const std::string &prec_type, Tensor *input, int sid, int rank,
int dst_rank, size_t bytes, const std::string &name);
SendOp(const std::string &prec_type, Tensor *input, int rank, int dst_rank,
size_t bytes, const std::string &name);
std::string function_name(const OpConfig &cfg) const;
OpArgs function_call_args(const OpConfig &cfg) const;
};

class SendDoneOp : public Op {
public:
SendDoneOp(const std::string &prec_type, Tensor *input, int sid, int rank,
SendDoneOp(const std::string &prec_type, Tensor *input, int rank,
int dst_rank, const std::string &name);
std::string function_name(const OpConfig &cfg) const;
OpArgs function_call_args(const OpConfig &cfg) const;
Expand Down
2 changes: 1 addition & 1 deletion ark/ops/ops_identity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Tensor *Model::identity(Tensor *input, const std::vector<Tensor *> &deps,
dep_vec.emplace_back(dep);
}
return this->tensor(input->shape, input->type, input->buf, input->ldims,
input->offs, input->pads, dep_vec, input->exported,
input->offs, input->pads, dep_vec, input->exported_sid,
input->imported_rank, name + "/identity");
}

Expand Down
Loading

0 comments on commit 0facc33

Please sign in to comment.