-
Notifications
You must be signed in to change notification settings - Fork 2
/
test.py
121 lines (97 loc) · 4.21 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# test.py
import pytorch_lightning as pl
import argparse
import yaml
import os
from pl_modules.citywalk_datamodule import CityWalkDataModule
from pl_modules.teleop_datamodule import TeleopDataModule
from pl_modules.citywalker_module import CityWalkerModule
from pl_modules.citywalker_feat_module import CityWalkerFeatModule
import torch
import glob
torch.set_float32_matmul_precision('medium')
pl.seed_everything(42, workers=True)
class DictNamespace(argparse.Namespace):
def __init__(self, **kwargs):
for key, value in kwargs.items():
if isinstance(value, dict):
setattr(self, key, DictNamespace(**value))
else:
setattr(self, key, value)
def parse_args():
parser = argparse.ArgumentParser(description='Test UrbanNav model')
parser.add_argument('--config', type=str, default='config/default.yaml', help='Path to config file')
parser.add_argument('--checkpoint', type=str, default=None, help='Path to model checkpoint. If not provided, the latest checkpoint will be used.')
parser.add_argument('--save_predictions', action='store_true', help='Whether to save predictions')
args = parser.parse_args()
return args
def load_config(config_path):
with open(config_path, 'r') as f:
cfg_dict = yaml.safe_load(f)
cfg = DictNamespace(**cfg_dict)
return cfg
def find_latest_checkpoint(checkpoint_dir):
"""
Finds the latest checkpoint in the given directory based on modification time.
Args:
checkpoint_dir (str): Path to the directory containing checkpoints.
Returns:
str: Path to the latest checkpoint file.
Raises:
FileNotFoundError: If no checkpoint files are found in the directory.
"""
checkpoint_pattern = os.path.join(checkpoint_dir, '*.ckpt')
checkpoint_files = glob.glob(checkpoint_pattern)
if not checkpoint_files:
raise FileNotFoundError(f"No checkpoint files found in directory: {checkpoint_dir}")
# Sort checkpoints by modification time (latest first)
checkpoint_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
latest_checkpoint = checkpoint_files[0]
return latest_checkpoint
def main():
args = parse_args()
cfg = load_config(args.config)
# Create a directory for test results
test_dir = os.path.join(cfg.project.result_dir, cfg.project.run_name, 'test')
os.makedirs(test_dir, exist_ok=True)
# Initialize the DataModule
if cfg.data.type == 'citywalk':
datamodule = CityWalkDataModule(cfg)
elif cfg.data.type == 'teleop':
datamodule = TeleopDataModule(cfg)
else:
raise ValueError(f"Invalid dataset: {cfg.data.dataset}")
# Determine the checkpoint path
if args.checkpoint:
checkpoint_path = args.checkpoint
if not os.path.isfile(checkpoint_path):
raise FileNotFoundError(f"Checkpoint not found at {checkpoint_path}")
else:
# Automatically find the latest checkpoint
checkpoint_dir = os.path.join(cfg.project.result_dir, cfg.project.run_name, 'checkpoints')
if not os.path.isdir(checkpoint_dir):
raise FileNotFoundError(f"Checkpoint directory does not exist: {checkpoint_dir}")
checkpoint_path = find_latest_checkpoint(checkpoint_dir)
print(f"No checkpoint specified. Using the latest checkpoint: {checkpoint_path}")
# Load the model from the checkpoint
if cfg.model.type == 'citywalker':
model = CityWalkerModule.load_from_checkpoint(checkpoint_path, cfg=cfg)
elif cfg.model.type == 'citywalker_feat':
model = CityWalkerFeatModule.load_from_checkpoint(checkpoint_path, cfg=cfg)
else:
raise ValueError(f"Invalid model: {cfg.model.type}")
model.result_dir = test_dir
print(f"Loaded model from checkpoint: {checkpoint_path}")
# Initialize Trainer
trainer = pl.Trainer(
default_root_dir=test_dir,
devices=cfg.training.gpus,
precision='16-mixed' if cfg.training.amp else 32,
accelerator='ddp' if cfg.training.gpus > 1 else 'gpu',
logger=False
# You can add more Trainer arguments if needed
)
# Run testing
trainer.test(model, datamodule=datamodule, verbose=True)
if __name__ == '__main__':
main()