Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add verbosity setting to blocking submit #24

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions aiida_jutools/submit/blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class BlockingSubmissionControllerSettings:
:param delete_if_stalling: True: delete nodes of 'stalling' top processes. Default True.
:param delete_if_stalling_dry_run: True: if delete_if_stalling, simulate delete_if_stalling to 'try it out'.
:param max_wait_for_stalling: delete top process (node & descendants) if running this long. To avoid congestion.
:param verbose: True: print status messages. Default True.
"""
dry_run: bool = True
max_top_processes_running: int = 30
Expand All @@ -57,6 +58,7 @@ class BlockingSubmissionControllerSettings:
delete_if_stalling: bool = False
delete_if_stalling_dry_run: bool = False
max_wait_for_stalling: int = 240
verbose: bool = True


class BlockingSubmissionController:
Expand Down Expand Up @@ -117,6 +119,9 @@ def submit(self,
:param groups: restrict to processes in a group or list of groups (optional)
:return: tuple (next process, is process from db True or from submit False) or (None,None) if submit failed
"""
# handle for settings
s = self.settings

self.__tmp_guard_against_delete_if_stalling()

wc_label = builder.metadata.label
Expand Down Expand Up @@ -150,13 +155,11 @@ def _is_duplicate(wc):
workchains_not_terminated = [proc for proc in workchains if
not proc.is_terminated] # created, waiting or running

if len(workchains) > 1:
if len(workchains) > 1 and s.verbose:
print(
f"INFO: '{wc_label}': found multiple ({len(workchains)}) results in group(s) "
f"{[group.label for group in groups]}, pks: {[wc.pk for wc in workchains]}")

# handle for settings
s = self.settings
seconds_per_min = 1 if s.dry_run else 60

def num_running(granularity: int):
Expand All @@ -174,7 +177,8 @@ def num_running(granularity: int):
# found A:B in db and finished_ok
next_process_is_from_db = True
next_process = _orm.load_node(workchains_finished_ok[0].pk)
print(f"loaded '{wc_label}' from db, finished_ok")
if s.verbose:
print(f"loaded '{wc_label}' from db, finished_ok")
else:
# not found A:B in db with state finished_ok. try submitting
if workchains_terminated and not s.resubmit_failed:
Expand All @@ -184,14 +188,16 @@ def num_running(granularity: int):
info = f"process state {next_process.attributes['process_state']}"
info = info if not next_process.attributes.get('exit_status', None) else \
info + f", exit status {next_process.attributes['exit_status']}"
print(f"loaded '{wc_label}' from db, {info}, (retry modus {s.resubmit_failed})")
if s.verbose:
print(f"loaded '{wc_label}' from db, {info}, (retry modus {s.resubmit_failed})")

elif workchains_not_terminated:
# found A:B in db with state not terminated, so it's currently in the queue already
next_process_is_from_db = False
next_process = _orm.load_node(workchains_not_terminated[0].pk)
self._submitted_top_processes.append(next_process)
print(f"'{wc_label}' is not terminated")
if s.verbose:
print(f"'{wc_label}' is not terminated")

else:
# not found A:B in db, so never submitted yet (or deleted since)
Expand Down Expand Up @@ -224,7 +230,8 @@ def num_running(granularity: int):
f"first found previously failed ('{wc_failed_first.label}', " \
f"{wc_failed_first.description}). Will use those supplied via builder.))"
info += " ..."
print(info)
if s.verbose:
print(info)

submitted = False

Expand Down Expand Up @@ -254,7 +261,8 @@ def stalling(wc):
if s.delete_if_stalling_dry_run else "deleting all its nodes"
info_msg = f"INFO: process pk={wc.pk} label='{wc.label}' exceeded max stalling " \
f"time {s.max_wait_for_stalling} min, {info_msg_suffix} ..."
print(info_msg)
if s.verbose:
print(info_msg)

if not s.delete_if_stalling_dry_run:
# note: we do not need to kill the top processnode's process first.
Expand All @@ -272,26 +280,30 @@ def stalling(wc):
_time.sleep(s.wait_for_submit * seconds_per_min)
else:
# process queue is not too full, can submit
print(f"try submit (waited {waited_for_submit} min, "
f"queued: {num_running(0)} top, {num_running(1)} all processes; "
f"wait another {s.wait_after_submit} minutes after submission)")
if s.verbose:
print(f"try submit (waited {waited_for_submit} min, "
f"queued: {num_running(0)} top, {num_running(1)} all processes; "
f"wait another {s.wait_after_submit} minutes after submission)")
if not s.dry_run:
next_process = _aiida_engine.submit(_builder)
self._submitted_top_processes.append(next_process)
for group in groups:
group.add_nodes([next_process])
print(f"submitted {wc_label}, pk {next_process.pk}")
if s.verbose:
print(f"submitted {wc_label}, pk {next_process.pk}")
_time.sleep(s.wait_after_submit * seconds_per_min)
else:
print(f"dry_run: would now submit {wc_label}")
if s.verbose:
print(f"dry_run: would now submit {wc_label}")
next_process = None
# submitted. exit waiting line
submitted = True
break
next_process_is_from_db = False

if not submitted:
print(f"WARNING: submission of '{wc_label}' timed out after {waited_for_submit} min waiting time.")
if s.verbose:
print(f"WARNING: submission of '{wc_label}' timed out after {waited_for_submit} min waiting time.")
next_process, next_process_is_from_db = None, None
return next_process, next_process_is_from_db

Expand Down