Skip to content

Commit

Permalink
Merge pull request #61 from pdobbelaere/main
Browse files Browse the repository at this point in the history
Create symlinks into psiflow_internal directory
  • Loading branch information
svandenhaute authored Dec 12, 2024
2 parents 0e5f68c + 9da0123 commit 0d33c73
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,9 @@ dmypy.json

pytest-tmp/
wandb/

# psiflow internal and its symlinks
psiflow_internal/
psiflow_log
psiflow_submit_scripts
psiflow_task_logs
34 changes: 26 additions & 8 deletions psiflow/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ def new_file(self, prefix: str, suffix: str) -> File:
@classmethod
def from_config(
cls,
path: Optional[Union[str, Path]] = None,
parsl_log_level: str = "WARNING",
usage_tracking: int = 3,
retries: int = 2,
Expand All @@ -457,10 +456,10 @@ def from_config(
container_engine: str = "apptainer",
container_addopts: str = " --no-eval -e --no-mount home -W /tmp --writable-tmpfs",
container_entrypoint: str = "/opt/entry.sh",
make_symlinks: bool = True,
**kwargs,
) -> ExecutionContext:
if path is None:
path = Path.cwd().resolve() / "psiflow_internal"
path = Path.cwd().resolve() / "psiflow_internal"
psiflow.resolve_and_check(path)
if path.exists():
shutil.rmtree(path)
Expand Down Expand Up @@ -548,7 +547,17 @@ def from_config(
internal_tasks_max_threads=internal_tasks_max_threads,
# std_autopath=std_autopath,
)
return ExecutionContext(config, definitions, path / "context_dir")
context = ExecutionContext(config, definitions, path / "context_dir")

if make_symlinks:
src, dest = Path.cwd() / f'psiflow_log', path / 'parsl.log'
_create_symlink(src, dest)
src, dest = Path.cwd() / f'psiflow_submit_scripts', path / '000' / 'submit_scripts'
_create_symlink(src, dest, is_dir=True)
src, dest = Path.cwd() / f'psiflow_task_logs', path / '000' / 'task_logs'
_create_symlink(src, dest, is_dir=True)

return context


class ExecutionContextLoader:
Expand All @@ -573,10 +582,6 @@ def load(
"use_threadpool": True,
},
}
path = Path.cwd() / ".psiflow_internal"
if path.exists():
shutil.rmtree(path)
psiflow_config["path"] = path
else:
assert len(sys.argv) == 2
path_config = psiflow.resolve_and_check(Path(sys.argv[1]))
Expand Down Expand Up @@ -666,3 +671,16 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s
class MyWorkQueueExecutor(WorkQueueExecutor):
def _get_launch_command(self, block_id):
return self.worker_command


def _create_symlink(src: Path, dest: Path, is_dir: bool = False) -> None:
"""Create or replace symbolic link"""
if src.is_symlink():
src.unlink()
if is_dir:
dest.mkdir(parents=True, exist_ok=True)
else:
dest.touch(exist_ok=True)
src.symlink_to(dest, target_is_directory=is_dir)


0 comments on commit 0d33c73

Please sign in to comment.