Skip to content

Commit

Permalink
Fix a bug that distributed tensors/embeddings are destroyed unexpecte…
Browse files Browse the repository at this point in the history
…dly. (#27)

*Issue #, if available:*
When we have a deep copy of a model object, it has a deep copy of
DistTensor and DistEmbedding objects. When such a local object is
destroyed, the data in the remote server will be destroyed as well. To
fix the issue, we should avoid deep copy of these objects locally when a
model is deep copied.

*Description of changes:*
Avoid deep copy of DistTensor and DistEmbedding objects.

By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: Da Zheng <[email protected]>
Co-authored-by: Ubuntu <[email protected]>
Co-authored-by: xiang song(charlie.song) <[email protected]>
  • Loading branch information
4 people authored Mar 21, 2023
1 parent 89154c9 commit e850be1
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 31 deletions.
6 changes: 4 additions & 2 deletions examples/customized_models/HGT/hgt_nc.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,9 @@ def main(args):
mini_batch_infer=True)

# After training, get the best model from the trainer.
best_model = trainer.get_best_model()
best_model_path = trainer.get_best_model_path()
# TODO(zhengda) the model path has to be in a shared filesystem.
model.restore_model(best_model_path)

# Create a dataset for inference.
infer_data = GSgnnNodeInferData(config.graph_name, config.part_config,
Expand All @@ -338,7 +340,7 @@ def main(args):
label_field=config.label_field)

# Create an inference for a node task.
infer = GSgnnNodePredictionInfer(best_model, gs.get_rank())
infer = GSgnnNodePredictionInfer(model, gs.get_rank())
infer.setup_cuda(dev_id=gs.get_rank())
infer.setup_evaluator(evaluator)
infer.setup_task_tracker(tracker)
Expand Down
1 change: 0 additions & 1 deletion python/graphstorm/dataloading/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ def prepare_data(self, g):
pb = g.get_partition_book()
if self.train_etypes is None:
self._train_etypes = g.canonical_etypes
print(self.train_etypes)
for canonical_etype in self.train_etypes:
if 'train_mask' in g.edges[canonical_etype].data:
train_idx = dgl.distributed.edge_split(
Expand Down
3 changes: 3 additions & 0 deletions python/graphstorm/model/gnn.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ def restore_model(self, restore_model_path):
# TODO(zhengda) we should load the sparse embeddings in parallel in the future.
print('Load Sparse embedding from ', restore_model_path)
load_sparse_embeds(restore_model_path, self.node_input_encoder)
# We need to make sure that the sparse embedding is completely loaded
# before all processes use the model.
th.distributed.barrier()

def init_optimizer(self, lr, sparse_lr, weight_decay):
"""initialize the model's optimizers
Expand Down
20 changes: 12 additions & 8 deletions python/graphstorm/trainer/gsgnn_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
GraphStorm trainer base
"""
import os
import copy
import psutil
import torch as th

Expand Down Expand Up @@ -52,7 +51,7 @@ def __init__(self, model, rank, topk_model_to_save=1):
self._dev_id = -1
self._evaluator = None
self._task_tracker = None
self._best_model = None
self._best_model_path = None

assert topk_model_to_save >= 0
self._topklist = TopKList(topk_model_to_save) # A list to store the top k best
Expand Down Expand Up @@ -242,9 +241,6 @@ def save_topk_models(self, model, epoch, i, val_score, save_model_path):
score_rank = 1
else:
score_rank = self.evaluator.get_val_score_rank(val_score)
# If this is the best model
if score_rank == 1:
self._best_model = copy.deepcopy(model.module).to("cpu")

insert_success, (return_epoch, return_i) = self._topklist.insert(score_rank, (epoch, i))

Expand All @@ -260,10 +256,18 @@ def save_topk_models(self, model, epoch, i, val_score, save_model_path):
# save this epoch and iteration's model and node embeddings
self.save_model(model, epoch, i, save_model_path)

def get_best_model(self):
""" Return the best model.
# If this is the best model
if score_rank == 1 and save_model_path is not None:
self._best_model_path = self._gen_model_path(save_model_path, epoch, i)

def get_best_model_path(self):
""" Return the path of the best model.
"""
return self._best_model
assert self._best_model_path is not None, "Cannot get the best model from the trainer."
assert os.path.exists(self._best_model_path), \
f"The model path {self._best_model_path} does not exist." \
+ "Please make sure that the model is saved in a shared filesystem."
return self._best_model_path

def _gen_model_path(self, base_path, epoch, i):
"""
Expand Down
22 changes: 18 additions & 4 deletions training_scripts/gsgnn_ep/gsgnn_ep.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
GSgnn edge prediction.
"""

import os
import torch as th
import graphstorm as gs
from graphstorm.config import get_argument_parser
Expand Down Expand Up @@ -100,17 +101,30 @@ def main(args):
# The input layer can pre-compute node features in the preparing step if needed.
# For example pre-compute all BERT embeddings
model.prepare_input_encoder(train_data)
if config.save_model_path is not None:
save_model_path = config.save_model_path
elif config.save_embed_path is not None:
# If we need to save embeddings, we need to save the model somewhere.
save_model_path = os.path.join(config.save_embed_path, "model")
else:
save_model_path = None
trainer.fit(train_loader=dataloader, val_loader=val_dataloader,
test_loader=test_dataloader, n_epochs=config.n_epochs,
save_model_path=config.save_model_path,
save_model_path=save_model_path,
mini_batch_infer=config.mini_batch_infer,
save_model_per_iters=config.save_model_per_iters,
save_perf_results_path=config.save_perf_results_path)

if config.save_embed_path is not None:
best_model = trainer.get_best_model().to(device)
assert best_model is not None, "Cannot get the best model from the trainer."
embeddings = do_full_graph_inference(best_model, train_data, task_tracker=tracker)
model = gs.create_builtin_edge_gnn_model(train_data.g, config, train_task=False)
best_model_path = trainer.get_best_model_path()
# TODO(zhengda) the model path has to be in a shared filesystem.
model.restore_model(best_model_path)
# Preparing input layer for training or inference.
# The input layer can pre-compute node features in the preparing step if needed.
# For example pre-compute all BERT embeddings
model.prepare_input_encoder(train_data)
embeddings = do_full_graph_inference(model, train_data, task_tracker=tracker)
save_embeddings(config.save_embed_path, embeddings, gs.get_rank(),
th.distributed.get_world_size())

Expand Down
22 changes: 18 additions & 4 deletions training_scripts/gsgnn_ep/gsgnn_lm_ep.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
GSgnn edge prediction.
"""

import os
import torch as th
import graphstorm as gs
from graphstorm.config import get_argument_parser
Expand Down Expand Up @@ -96,18 +97,31 @@ def main(args):
# The input layer can pre-compute node features in the preparing step if needed.
# For example pre-compute all BERT embeddings
model.prepare_input_encoder(train_data)
if config.save_model_path is not None:
save_model_path = config.save_model_path
elif config.save_embed_path is not None:
# If we need to save embeddings, we need to save the model somewhere.
save_model_path = os.path.join(config.save_embed_path, "model")
else:
save_model_path = None
trainer.fit(train_loader=dataloader, val_loader=val_dataloader,
test_loader=test_dataloader, n_epochs=config.n_epochs,
save_model_path=config.save_model_path,
save_model_path=save_model_path,
mini_batch_infer=config.mini_batch_infer,
save_model_per_iters=config.save_model_per_iters,
save_perf_results_path=config.save_perf_results_path,
freeze_input_layer_epochs=config.freeze_lm_encoder_epochs)

if config.save_embed_path is not None:
best_model = trainer.get_best_model().to(device)
assert best_model is not None, "Cannot get the best model from the trainer."
embeddings = do_full_graph_inference(best_model, train_data, task_tracker=tracker)
model = gs.create_builtin_edge_model(train_data.g, config, train_task=False)
best_model_path = trainer.get_best_model_path()
# TODO(zhengda) the model path has to be in a shared filesystem.
model.restore_model(best_model_path)
# Preparing input layer for training or inference.
# The input layer can pre-compute node features in the preparing step if needed.
# For example pre-compute all BERT embeddings
model.prepare_input_encoder(train_data)
embeddings = do_full_graph_inference(model, train_data, task_tracker=tracker)
save_embeddings(config.save_embed_path, embeddings, gs.get_rank(),
th.distributed.get_world_size())

Expand Down
22 changes: 18 additions & 4 deletions training_scripts/gsgnn_lp/gsgnn_lm_lp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
GSgnn pure gpu link prediction.
"""

import os
import torch as th
import graphstorm as gs
from graphstorm.config import get_argument_parser
Expand Down Expand Up @@ -107,19 +108,32 @@ def main(args):
# The input layer can pre-compute node features in the preparing step if needed.
# For example pre-compute all BERT embeddings
model.prepare_input_encoder(train_data)
if config.save_model_path is not None:
save_model_path = config.save_model_path
elif config.save_embed_path is not None:
# If we need to save embeddings, we need to save the model somewhere.
save_model_path = os.path.join(config.save_embed_path, "model")
else:
save_model_path = None
trainer.fit(train_loader=dataloader, val_loader=val_dataloader,
test_loader=test_dataloader, n_epochs=config.n_epochs,
save_model_path=config.save_model_path,
save_model_path=save_model_path,
mini_batch_infer=config.mini_batch_infer,
save_model_per_iters=config.save_model_per_iters,
save_perf_results_path=config.save_perf_results_path,
freeze_input_layer_epochs=config.freeze_lm_encoder_epochs)

if config.save_embed_path is not None:
best_model = trainer.get_best_model().to(device)
assert best_model is not None, "Cannot get the best model from the trainer."
model = gs.create_builtin_lp_model(train_data.g, config, train_task=False)
best_model_path = trainer.get_best_model_path()
# TODO(zhengda) the model path has to be in a shared filesystem.
model.restore_model(best_model_path)
# Preparing input layer for training or inference.
# The input layer can pre-compute node features in the preparing step if needed.
# For example pre-compute all BERT embeddings
model.prepare_input_encoder(train_data)
# TODO(zhengda) we may not want to only use training edges to generate GNN embeddings.
embeddings = do_full_graph_inference(best_model, train_data,
embeddings = do_full_graph_inference(model, train_data,
edge_mask="train_mask", task_tracker=tracker)
save_embeddings(config.save_embed_path, embeddings, gs.get_rank(),
th.distributed.get_world_size())
Expand Down
22 changes: 18 additions & 4 deletions training_scripts/gsgnn_lp/gsgnn_lp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
GSgnn pure gpu link prediction.
"""

import os
import torch as th
import graphstorm as gs
from graphstorm.config import get_argument_parser
Expand Down Expand Up @@ -109,18 +110,31 @@ def main(args):
# The input layer can pre-compute node features in the preparing step if needed.
# For example pre-compute all BERT embeddings
model.prepare_input_encoder(train_data)
if config.save_model_path is not None:
save_model_path = config.save_model_path
elif config.save_embed_path is not None:
# If we need to save embeddings, we need to save the model somewhere.
save_model_path = os.path.join(config.save_embed_path, "model")
else:
save_model_path = None
trainer.fit(train_loader=dataloader, val_loader=val_dataloader,
test_loader=test_dataloader, n_epochs=config.n_epochs,
save_model_path=config.save_model_path,
save_model_path=save_model_path,
mini_batch_infer=config.mini_batch_infer,
save_model_per_iters=config.save_model_per_iters,
save_perf_results_path=config.save_perf_results_path)

if config.save_embed_path is not None:
best_model = trainer.get_best_model().to(device)
assert best_model is not None, "Cannot get the best model from the trainer."
model = gs.create_builtin_lp_gnn_model(train_data.g, config, train_task=False)
best_model_path = trainer.get_best_model_path()
# TODO(zhengda) the model path has to be in a shared filesystem.
model.restore_model(best_model_path)
# Preparing input layer for training or inference.
# The input layer can pre-compute node features in the preparing step if needed.
# For example pre-compute all BERT embeddings
model.prepare_input_encoder(train_data)
# TODO(zhengda) we may not want to only use training edges to generate GNN embeddings.
embeddings = do_full_graph_inference(best_model, train_data,
embeddings = do_full_graph_inference(model, train_data,
edge_mask="train_mask", task_tracker=tracker)
save_embeddings(config.save_embed_path, embeddings, gs.get_rank(),
th.distributed.get_world_size())
Expand Down
22 changes: 18 additions & 4 deletions training_scripts/gsgnn_np/gsgnn_np.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
GSgnn node prediction.
"""

import os
import torch as th
import graphstorm as gs
from graphstorm.config import get_argument_parser
Expand Down Expand Up @@ -91,17 +92,30 @@ def main(args):
# The input layer can pre-compute node features in the preparing step if needed.
# For example pre-compute all BERT embeddings
model.prepare_input_encoder(train_data)
if config.save_model_path is not None:
save_model_path = config.save_model_path
elif config.save_embed_path is not None:
# If we need to save embeddings, we need to save the model somewhere.
save_model_path = os.path.join(config.save_embed_path, "model")
else:
save_model_path = None
trainer.fit(train_loader=dataloader, val_loader=val_dataloader,
test_loader=test_dataloader, n_epochs=config.n_epochs,
save_model_path=config.save_model_path,
save_model_path=save_model_path,
mini_batch_infer=config.mini_batch_infer,
save_model_per_iters=config.save_model_per_iters,
save_perf_results_path=config.save_perf_results_path)

if config.save_embed_path is not None:
best_model = trainer.get_best_model().to(device)
assert best_model is not None, "Cannot get the best model from the trainer."
embeddings = do_full_graph_inference(best_model, train_data, task_tracker=tracker)
model = gs.create_builtin_node_gnn_model(train_data.g, config, train_task=False)
best_model_path = trainer.get_best_model_path()
# TODO(zhengda) the model path has to be in a shared filesystem.
model.restore_model(best_model_path)
# Preparing input layer for training or inference.
# The input layer can pre-compute node features in the preparing step if needed.
# For example pre-compute all BERT embeddings
model.prepare_input_encoder(train_data)
embeddings = do_full_graph_inference(model, train_data, task_tracker=tracker)
save_embeddings(config.save_embed_path, embeddings, gs.get_rank(),
th.distributed.get_world_size())

Expand Down

0 comments on commit e850be1

Please sign in to comment.