Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Myqueue conversion #182

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f5c0a14
Bump matplotlib from 3.4.3 to 3.5.0
dependabot[bot] Nov 17, 2021
fb36c1a
Bump furo from 2021.11.15 to 2021.11.16
dependabot[bot] Nov 17, 2021
5a4f804
Added first implementation of a myqueue conversion scheme for jobflow
davidwaroquiers Nov 18, 2021
60e3407
Merge branch 'main' of https://github.com/davidwaroquiers/jobflow
davidwaroquiers Nov 18, 2021
5278c62
Bump furo from 2021.11.16 to 2021.11.23
dependabot[bot] Nov 29, 2021
c6e1d4b
Bump maggma from 0.32.1 to 0.32.3
dependabot[bot] Nov 29, 2021
f31d757
Bump ipython from 7.29.0 to 7.30.0
dependabot[bot] Nov 29, 2021
96b5d6a
Bump sphinx from 4.3.0 to 4.3.1
dependabot[bot] Nov 29, 2021
2178d9c
Bump fireworks from 1.9.7 to 1.9.8
dependabot[bot] Dec 13, 2021
cd0c260
Bump ipython from 7.30.0 to 7.30.1
dependabot[bot] Dec 13, 2021
aa2be5b
Bump matplotlib from 3.5.0 to 3.5.1
dependabot[bot] Dec 13, 2021
312b86b
Bump m2r2 from 0.3.1 to 0.3.2
dependabot[bot] Dec 13, 2021
9ed307f
Bump sphinx from 4.3.1 to 4.3.2
dependabot[bot] Dec 20, 2021
02102a9
Bump nbsphinx from 0.8.7 to 0.8.8
dependabot[bot] Jan 3, 2022
b0350ac
Bump pydantic from 1.8.2 to 1.9.0
dependabot[bot] Jan 3, 2022
d8747aa
Bump furo from 2021.11.23 to 2022.1.2
dependabot[bot] Jan 3, 2022
13813fa
Bump monty from 2021.8.17 to 2021.12.1
dependabot[bot] Jan 3, 2022
52b073a
Bump autodoc-pydantic from 1.5.1 to 1.6.0
dependabot[bot] Jan 10, 2022
461589a
Bump ipython from 7.30.1 to 7.31.0
dependabot[bot] Jan 10, 2022
fc0880c
Bump sphinx from 4.3.2 to 4.4.0
dependabot[bot] Jan 17, 2022
80e17a0
Bump ipython from 7.31.0 to 8.0.0
dependabot[bot] Jan 17, 2022
3eaf8bd
Bump monty from 2021.12.1 to 2022.1.12.1
dependabot[bot] Jan 17, 2022
23baaf3
Bump ipython from 8.0.0 to 8.0.1
dependabot[bot] Jan 24, 2022
4675843
Bump monty from 2022.1.12.1 to 2022.1.19
dependabot[bot] Jan 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
monty==2021.8.17
monty==2022.1.19
networkx==2.6.3
pydash==5.1.0
maggma==0.32.1
pydantic==1.8.2
maggma==0.32.3
pydantic==1.9.0
PyYAML==6.0
18 changes: 9 additions & 9 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@
],
extras_require={
"docs": [
"sphinx==4.3.0",
"furo==2021.11.15",
"m2r2==0.3.1",
"ipython==7.29.0",
"nbsphinx==0.8.7",
"sphinx==4.4.0",
"furo==2022.1.2",
"m2r2==0.3.2",
"ipython==8.0.1",
"nbsphinx==0.8.8",
"nbsphinx-link==1.3.0",
"FireWorks==1.9.7",
"autodoc_pydantic==1.5.1",
"FireWorks==1.9.8",
"autodoc_pydantic==1.6.0",
],
"tests": [
"pytest==6.2.5",
"pytest-cov==3.0.0",
"FireWorks==1.9.7",
"matplotlib==3.4.3",
"FireWorks==1.9.8",
"matplotlib==3.5.1",
"pydot==1.4.2",
],
"dev": ["pre-commit>=2.12.1"],
Expand Down
110 changes: 110 additions & 0 deletions src/jobflow/managers/myqueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Tools for running :obj:`Flow` and :obj:`Job` objects using the Myqueue package.

Notes
-----
Myqueue heavily relies on the file system. To submit a workflow, one has to run:
mq workflow workflow.py DIRECTORY_PATTERNS
where workflow.py is a python script defining one workflow. For jobflow Flows, the
workflow.py file in myqueue_scripts has to be used.
"""

from __future__ import annotations

import json
import os
import typing
from datetime import datetime
from pathlib import Path
from random import randint

from monty.json import MontyDecoder, MontyEncoder
from monty.os import cd

if typing.TYPE_CHECKING:
from typing import List, Union

import jobflow

from maggma.stores import JSONStore

from jobflow import JobStore

__all__ = ["flow_to_myqueue", "run_myqueue_task"]

FLOW_JSON = "flow.json"
JOB_STORE_JSON = "job_store.json"


def flow_to_myqueue(
flow: Union[jobflow.Flow, jobflow.Job, List[jobflow.Job]],
):
"""
Convert a jobflow Flow to myqueue.

This is basically just dumping the jobflow Flow to a flow.json file.
The flow.json file is then read again when the user wants to submit
the workflow using myqueue.

Parameters
----------
flow
A flow or job.

"""
from jobflow.core.flow import get_flow

flow = get_flow(flow)
with open(FLOW_JSON, "w") as f:
json.dump(flow, f, cls=MontyEncoder, indent=2)


def run_myqueue_task(uuid):
"""
Run a job in myqueue.

Parameters
----------
uuid
Unique identifier of the job that needs to be executed.
"""
root_dir = Path.cwd()
# First get the jobflow Flow from the flow.json file
with open("flow.json", "r") as f:
flow = json.load(f, cls=MontyDecoder)

# Get the jobflow Job corresponding to the uuid
job = _get_job(flow, uuid)
job_dir = _get_job_dir(root_dir=root_dir)

# Initialize the store for output references
job_store_json_path = os.path.join(root_dir, JOB_STORE_JSON)
if not os.path.exists(job_store_json_path):
with open(job_store_json_path, "w") as f:
json.dump([], f)
store = JobStore(JSONStore(job_store_json_path, writable=True))
store.connect()

# Run the job
with cd(job_dir):
job.run(store=store)


def _get_job(flow, uuid):
myjob = None
for job, _ in flow.iterflow():
if job.uuid == uuid:
if myjob is not None:
raise RuntimeError(f"Multiple jobs with uuid {uuid}")
myjob = job
return myjob


def _get_job_dir(root_dir):
time_now = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S-%f")
job_dir = root_dir / f"job_{time_now}-{randint(10000, 99999)}"
job_dir.mkdir()
return job_dir


if __name__ == "__main__":
pass
26 changes: 26 additions & 0 deletions src/jobflow/managers/myqueue_scripts/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Template workflow.py file for running jobflow workflows in myqueue."""


import json

from monty.json import MontyDecoder
from myqueue.task import task


def create_tasks():
"""Create tasks for myqueue."""
# First reconstruct the jobflow Flow object
with open("flow.json", "r") as f:
flow = json.load(f, cls=MontyDecoder)

tasks = []
uuid2task = {}
for job, parents in flow.iterflow():
deps = [uuid2task[parent_uuid] for parent_uuid in parents]
t = task(
"jobflow.managers.myqueue@run_myqueue_task", args=[job.uuid], deps=deps
)
uuid2task[job.uuid] = t
tasks.append(t)

return tasks