Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add dcn 40M amp script #370

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 95 additions & 58 deletions RecommenderSystems/dcn/dcn_train_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,27 @@ def str_list(x):
parser = argparse.ArgumentParser()
parser.add_argument("--data_dir", type=str, required=True)
parser.add_argument(
"--num_train_samples", type=int, default=36672493, help="the number of training samples"
"--num_train_samples", type=int, default=36672493, help="the number of training samples",
)
parser.add_argument(
"--num_valid_samples", type=int, default=4584062, help="the number of validation samples"
"--num_valid_samples", type=int, default=4584062, help="the number of validation samples",
)
parser.add_argument(
"--num_test_samples", type=int, default=4584062, help="the number of test samples"
"--num_test_samples", type=int, default=4584062, help="the number of test samples",
)

parser.add_argument("--shard_seed", type=int, default=2022)
parser.add_argument("--model_load_dir", type=str, default=None)
parser.add_argument("--model_save_dir", type=str, default=None)
parser.add_argument("--save_best_model", action="store_true", help="save best model or not")
parser.add_argument(
"--save_initial_model", action="store_true", help="save initial model parameters or not."
"--save_initial_model", action="store_true", help="save initial model parameters or not.",
)
parser.add_argument(
"--save_model_after_each_eval", action="store_true", help="save model after each eval."
"--save_model_after_each_eval", action="store_true", help="save model after each eval.",
)

parser.add_argument("--disable_fusedmlp", action="store_true", help="disable fused MLP or not")
parser.add_argument("--embedding_vec_size", type=int, default=16)
parser.add_argument("--batch_norm", type=bool, default=False)
parser.add_argument("--dnn_hidden_units", type=int_list, default="1000,1000,1000,1000,1000")
Expand Down Expand Up @@ -77,7 +79,7 @@ def str_list(x):
required=True,
)
parser.add_argument(
"--persistent_path", type=str, required=True, help="path for persistent kv store"
"--persistent_path", type=str, required=True, help="path for persistent kv store",
)
parser.add_argument("--store_type", type=str, default="cached_host_mem")
parser.add_argument("--cache_memory_budget_mb", type=int, default=8192)
Expand Down Expand Up @@ -126,7 +128,7 @@ def __init__(
self.shard_count = shard_count
self.cur_shard = cur_shard

fields = ["Label"]
fields = ["label"]
fields += [f"I{i+1}" for i in range(num_dense_fields)]
fields += [f"C{i+1}" for i in range(num_sparse_fields)]
self.fields = fields
Expand Down Expand Up @@ -262,53 +264,76 @@ def forward(self, ids):
return self.one_embedding.forward(ids)


class CrossInteractionLayer(nn.Module):
'''
Follow the same CrossInteractionLayer implementation of FuxiCTR
'''
def __init__(self, input_dim):
super(CrossInteractionLayer, self).__init__()
self.weight = nn.Linear(input_dim, 1, bias=False)
self.bias = nn.Parameter(flow.zeros(input_dim))

def forward(self, X_0, X_i):
interaction_out = self.weight(X_i) * X_0 + self.bias
return interaction_out


class CrossNet(nn.Module):
'''
Follow the same CrossNet implementation of FuxiCTR
'''
def __init__(self, input_dim, num_layers):
super(CrossNet, self).__init__()
self.num_layers = num_layers
self.cross_net = nn.ModuleList(
CrossInteractionLayer(input_dim) for _ in range(self.num_layers)
)
self.input_dim = input_dim
self.add_parameters()
self.reset_parameters()

def add_parameters(self) -> None:
for idx in range(self.num_layers):
self.register_parameter(
f"weight_{idx}", flow.nn.Parameter(flow.Tensor(1, self.input_dim,)),
)
self.register_parameter(
f"bias_{idx}", flow.nn.Parameter(flow.zeros(self.input_dim)),
)

def weight(self, i):
return getattr(self, f"weight_{i}")

def bias(self, i):
return getattr(self, f"bias_{i}")

def reset_parameters(self) -> None:
for i in range(self.num_layers):
flow.nn.init.kaiming_uniform_(self.weight(i), a=math.sqrt(5))

def forward(self, X_0):
X_i = X_0 # b x dim
for i in range(self.num_layers):
X_i = X_i + self.cross_net[i](X_0, X_i)
X_i = flow._C.fused_cross_feature_interaction(
X_i, self.weight(i), X_0, self.bias(i), "vector"
)
return X_i


class DNN(nn.Module):
def __init__(
self, input_dim, hidden_units=[], dropout_rates=0, batch_norm=False, use_bias=True,
self,
input_dim,
hidden_units=[],
dropout_rates=0,
use_fusedmlp=True,
batch_norm=False,
use_bias=True,
):
super(DNN, self).__init__()
dense_layers = []
hidden_units = [input_dim] + hidden_units
for idx in range(len(hidden_units) - 1):
dense_layers.append(nn.Linear(hidden_units[idx], hidden_units[idx + 1], bias=use_bias))
dense_layers.append(nn.ReLU())
if batch_norm:
dense_layers.append(nn.BatchNorm1d(hidden_units[idx + 1]))
if dropout_rates > 0:
dense_layers.append(nn.Dropout(p=dropout_rates))
self.dnn = nn.Sequential(*dense_layers) # * used to unpack list
if use_fusedmlp and not batch_norm:
hidden_dropout_rates_list = [dropout_rates] * (len(hidden_units) - 1)
self.dnn = nn.FusedMLP(
input_dim,
hidden_units[:-1],
hidden_units[-1],
hidden_dropout_rates_list,
dropout_rates,
False,
)
else:
hidden_units = [input_dim] + hidden_units
for idx in range(len(hidden_units) - 1):
dense_layers.append(
nn.Linear(hidden_units[idx], hidden_units[idx + 1], bias=use_bias)
)
dense_layers.append(nn.ReLU())
if batch_norm:
dense_layers.append(nn.BatchNorm1d(hidden_units[idx + 1]))
if dropout_rates > 0:
dense_layers.append(nn.Dropout(p=dropout_rates))
self.dnn = nn.Sequential(*dense_layers) # * used to unpack list

def forward(self, inputs):
return self.dnn(inputs)
Expand All @@ -324,6 +349,7 @@ def __init__(
cache_memory_budget_mb,
size_factor,
dnn_hidden_units=[128, 128],
use_fusedmlp=True,
crossing_layers=3,
net_dropout=0.2,
batch_norm=False,
Expand All @@ -347,6 +373,7 @@ def __init__(
input_dim=input_dim,
hidden_units=dnn_hidden_units,
dropout_rates=net_dropout,
use_fusedmlp=use_fusedmlp,
batch_norm=batch_norm,
use_bias=True,
)
Expand Down Expand Up @@ -374,7 +401,7 @@ def forward(self, X):
else:
final_out = cross_out
y_pred = self.fc(final_out)
return y_pred.sigmoid()
return y_pred

def reset_parameters(self):
def reset_param(m):
Expand All @@ -394,6 +421,7 @@ def make_dcn_module(args):
one_embedding_store_type=args.store_type,
cache_memory_budget_mb=args.cache_memory_budget_mb,
dnn_hidden_units=args.dnn_hidden_units,
use_fusedmlp=not args.disable_fusedmlp,
crossing_layers=args.crossing_layers,
net_dropout=args.net_dropout,
batch_norm=args.batch_norm,
Expand All @@ -411,7 +439,7 @@ def __init__(self, dcn_module, amp=False):

def build(self, features):
predicts = self.module(features.to("cuda"))
return predicts
return predicts.sigmoid()


class DCNTrainGraph(flow.nn.Graph):
Expand All @@ -430,22 +458,33 @@ def __init__(
self.set_grad_scaler(grad_scaler)

def build(self, labels, features):
logits = self.module(features.to("cuda"))
loss = self.loss(logits, labels.to("cuda"))
loss.backward()
return loss.to("cpu")

logits = self.module(features.to("cuda")).squeeze()
loss = self.loss(logits, labels.squeeze().to("cuda"))
reduce_loss = flow.mean(loss)
reduce_loss.backward()

return reduce_loss.to("cpu")

# def make_lr_scheduler(args, optimizer):
# batches_per_epoch = math.ceil(args.num_train_samples / args.train_batch_size)
# multistep_lr = flow.optim.lr_scheduler.MultiStepLR(
# optimizer, milestones=[3 * batches_per_epoch], gamma=args.lr_factor
# )
# return multistep_lr

def make_lr_scheduler(args, optimizer):
batches_per_epoch = math.ceil(args.num_train_samples / args.train_batch_size)
multistep_lr = flow.optim.lr_scheduler.MultiStepLR(
optimizer, milestones=[3 * batches_per_epoch], gamma=args.lr_factor
warmup_lr = flow.optim.lr_scheduler.LinearLR(
optimizer, start_factor=0, total_iters=3000,
)
return multistep_lr

poly_decay_lr = flow.optim.lr_scheduler.PolynomialLR(
optimizer, decay_batch=60000, end_learning_rate=1e-7, power=2.0, cycle=False,
)
sequential_lr = flow.optim.lr_scheduler.SequentialLR(
optimizer=optimizer,
schedulers=[warmup_lr, poly_decay_lr],
milestones=[2000],
interval_rescaling=True,
)
return sequential_lr

def train(args):
rank = flow.env.get_rank()
Expand Down Expand Up @@ -504,9 +543,9 @@ def early_stop(
print(f"Early stopping at epoch={epoch}!")
return stop_training, best_metric, stopping_steps, save_best

opt = flow.optim.Adam(dcn_module.parameters(), lr=args.learning_rate)
opt = flow.optim.Adam(dcn_module.parameters(), lr=args.learning_rate, eps=1e-4)
lr_scheduler = None
loss_func = flow.nn.BCELoss(reduction="none").to("cuda")
loss_func = flow.nn.BCEWithLogitsLoss(reduction="mean").to("cuda")

if args.loss_scale_policy == "static":
grad_scaler = flow.amp.StaticGradScaler(1024)
Expand Down Expand Up @@ -550,15 +589,15 @@ def early_stop(
+ f"Latency {(latency * 1000):0.3f} ms, Throughput {throughput:0.1f}, {strtime}"
)

if step % batches_per_epoch == 0:
if step % 10000 == 0:
epoch += 1
val_auc, val_logloss = eval(
args,
eval_graph,
tag="val",
cur_step=step,
epoch=epoch,
cached_eval_batches=cached_valid_batches,
cached_eval_batches=None,
)
if args.save_model_after_each_eval:
save_model(f"step_{step}_val_auc_{val_auc:0.5f}")
Expand Down Expand Up @@ -677,6 +716,4 @@ def eval(args, eval_graph, tag="val", cur_step=0, epoch=0, cached_eval_batches=N
os.system(sys.executable + " -m oneflow --doctor")
flow.boxing.nccl.enable_all_to_all(True)
args = get_args()
train(args)


train(args)
52 changes: 39 additions & 13 deletions RecommenderSystems/dcn/train.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,30 @@
DEVICE_NUM_PER_NODE=1
DATA_DIR=your_path/criteo_parquet
PERSISTENT_PATH=your_path/persistent1
MODEL_SAVE_DIR=your_path/model_save_dir
#!/bin/bash
DEVICE_NUM_PER_NODE=4
DATA_DIR=/RAID0/xiexuan/criteo1t_parquet_40M_long
PERSISTENT_PATH=/home/zhengzekang/models/RecommenderSystems/dlrm/init_model


rm -rf /home/zhengzekang/models/RecommenderSystems/dlrm/init_model/0-4/*
rm -rf /home/zhengzekang/models/RecommenderSystems/dlrm/init_model/1-4/*
rm -rf /home/zhengzekang/models/RecommenderSystems/dlrm/init_model/2-4/*
rm -rf /home/zhengzekang/models/RecommenderSystems/dlrm/init_model/3-4/*

export ONEFLOW_ONE_EMBEDDING_FUSED_MLP_ASYNC_GRAD=1

export ONEFLOW_ONE_EMBEDDING_FUSE_EMBEDDING_INTERACTION=1
export ONEFLOW_ONE_EMBEDDING_GRADIENT_SHUFFLE_USE_FP16=1
export ONEFLOW_FUSE_MODEL_UPDATE_CAST=1
export ONEFLOW_ENABLE_MULTI_TENSOR_MODEL_UPDATE=1
export ONEFLOW_KERNEL_ENABLE_CUDA_GRAPH=1
export ONEFLOW_EAGER_LOCAL_TO_GLOBAL_BALANCED_OVERRIDE=1
export ONEFLOW_ONE_EMBEDDING_USE_SYSTEM_GATHER=0
export ONEFLOW_EP_CUDA_DEVICE_SCHEDULE=2
export ONEFLOW_EP_CUDA_STREAM_NON_BLOCKING=1
export ONEFLOW_ONE_EMBEDDING_ADD_ID_SHUFFLE_COPY_OUT=1
export ONEFLOW_ONE_EMBEDDING_FUSE_EMBEDDING_INTERACTION=1


export ONEFLOW_PROFILER_KERNEL_PROFILE_KERNEL_FORWARD_RANGE=1

python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
Expand All @@ -10,15 +33,18 @@ python3 -m oneflow.distributed.launch \
--master_addr 127.0.0.1 \
dcn_train_eval.py \
--data_dir $DATA_DIR \
--model_save_dir $MODEL_SAVE_DIR \
--persistent_path $PERSISTENT_PATH \
--table_size_array "649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376,1460,583,10131227,2202608,305,24,12517,633,3,93145,5683,8351593,3194,27,14992,5461306,10,5652,2173,4,7046547,18,15,286181,105,142572" \
--store_type 'cached_host_mem' \
--cache_memory_budget_mb 2048 \
--table_size_array "62866,8001,2901,74623,7530,3391,1400,21705,7937,21,276,1235896,9659,39884301,39040,17291,7421,20263,3,7121,1543,63,38532372,2953790,403302,10,2209,11938,155,4,976,14,39979538,25638302,39665755,585840,12973,108,36" \
--store_type 'device_mem' \
--train_batch_size 55296 \
--train_batches 75000 \
--loss_print_interval 1000 \
--dnn_hidden_units "1000, 1000, 1000, 1000, 1000" \
--crossing_layers 4 \
--embedding_vec_size 16




--learning_rate 0.003 \
--embedding_vec_size 16 \
--num_train_samples 4195197692 \
--num_valid_samples 89137318 \
--num_test_samples 89137319 \
--net_dropout 0.05 \
--amp