From 66070e7077cb454bcfd70dd3327b335499556a16 Mon Sep 17 00:00:00 2001
From: Philipp Ruessmann
Date: Thu, 25 Nov 2021 10:04:19 +0000
Subject: [PATCH] Add verbosity setting to blocking submit
---
aiida_jutools/submit/blocking.py | 40 +++++++++++++++++++++-----------
1 file changed, 26 insertions(+), 14 deletions(-)
diff --git a/aiida_jutools/submit/blocking.py b/aiida_jutools/submit/blocking.py
index f463204..b470b6d 100644
--- a/aiida_jutools/submit/blocking.py
+++ b/aiida_jutools/submit/blocking.py
@@ -45,6 +45,7 @@ class BlockingSubmissionControllerSettings:
:param delete_if_stalling: True: delete nodes of 'stalling' top processes. Default True.
:param delete_if_stalling_dry_run: True: if delete_if_stalling, simulate delete_if_stalling to 'try it out'.
:param max_wait_for_stalling: delete top process (node & descendants) if running this long. To avoid congestion.
+ :param verbose: True: print status messages. Default True.
"""
dry_run: bool = True
max_top_processes_running: int = 30
@@ -57,6 +58,7 @@ class BlockingSubmissionControllerSettings:
delete_if_stalling: bool = False
delete_if_stalling_dry_run: bool = False
max_wait_for_stalling: int = 240
+ verbose: bool = True
class BlockingSubmissionController:
@@ -117,6 +119,9 @@ def submit(self,
:param groups: restrict to processes in a group or list of groups (optional)
:return: tuple (next process, is process from db True or from submit False) or (None,None) if submit failed
"""
+ # handle for settings
+ s = self.settings
+
self.__tmp_guard_against_delete_if_stalling()
wc_label = builder.metadata.label
@@ -150,13 +155,11 @@ def _is_duplicate(wc):
workchains_not_terminated = [proc for proc in workchains if
not proc.is_terminated] # created, waiting or running
- if len(workchains) > 1:
+ if len(workchains) > 1 and s.verbose:
print(
f"INFO: '{wc_label}': found multiple ({len(workchains)}) results in group(s) "
f"{[group.label for group in groups]}, pks: {[wc.pk for wc in workchains]}")
- # handle for settings
- s = self.settings
seconds_per_min = 1 if s.dry_run else 60
def num_running(granularity: int):
@@ -174,7 +177,8 @@ def num_running(granularity: int):
# found A:B in db and finished_ok
next_process_is_from_db = True
next_process = _orm.load_node(workchains_finished_ok[0].pk)
- print(f"loaded '{wc_label}' from db, finished_ok")
+ if s.verbose:
+ print(f"loaded '{wc_label}' from db, finished_ok")
else:
# not found A:B in db with state finished_ok. try submitting
if workchains_terminated and not s.resubmit_failed:
@@ -184,14 +188,16 @@ def num_running(granularity: int):
info = f"process state {next_process.attributes['process_state']}"
info = info if not next_process.attributes.get('exit_status', None) else \
info + f", exit status {next_process.attributes['exit_status']}"
- print(f"loaded '{wc_label}' from db, {info}, (retry modus {s.resubmit_failed})")
+ if s.verbose:
+ print(f"loaded '{wc_label}' from db, {info}, (retry modus {s.resubmit_failed})")
elif workchains_not_terminated:
# found A:B in db with state not terminated, so it's currently in the queue already
next_process_is_from_db = False
next_process = _orm.load_node(workchains_not_terminated[0].pk)
self._submitted_top_processes.append(next_process)
- print(f"'{wc_label}' is not terminated")
+ if s.verbose:
+ print(f"'{wc_label}' is not terminated")
else:
# not found A:B in db, so never submitted yet (or deleted since)
@@ -224,7 +230,8 @@ def num_running(granularity: int):
f"first found previously failed ('{wc_failed_first.label}', " \
f"{wc_failed_first.description}). Will use those supplied via builder.))"
info += " ..."
- print(info)
+ if s.verbose:
+ print(info)
submitted = False
@@ -254,7 +261,8 @@ def stalling(wc):
if s.delete_if_stalling_dry_run else "deleting all its nodes"
info_msg = f"INFO: process pk={wc.pk} label='{wc.label}' exceeded max stalling " \
f"time {s.max_wait_for_stalling} min, {info_msg_suffix} ..."
- print(info_msg)
+ if s.verbose:
+ print(info_msg)
if not s.delete_if_stalling_dry_run:
# note: we do not need to kill the top processnode's process first.
@@ -272,18 +280,21 @@ def stalling(wc):
_time.sleep(s.wait_for_submit * seconds_per_min)
else:
# process queue is not too full, can submit
- print(f"try submit (waited {waited_for_submit} min, "
- f"queued: {num_running(0)} top, {num_running(1)} all processes; "
- f"wait another {s.wait_after_submit} minutes after submission)")
+ if s.verbose:
+ print(f"try submit (waited {waited_for_submit} min, "
+ f"queued: {num_running(0)} top, {num_running(1)} all processes; "
+ f"wait another {s.wait_after_submit} minutes after submission)")
if not s.dry_run:
next_process = _aiida_engine.submit(_builder)
self._submitted_top_processes.append(next_process)
for group in groups:
group.add_nodes([next_process])
- print(f"submitted {wc_label}, pk {next_process.pk}")
+ if s.verbose:
+ print(f"submitted {wc_label}, pk {next_process.pk}")
_time.sleep(s.wait_after_submit * seconds_per_min)
else:
- print(f"dry_run: would now submit {wc_label}")
+ if s.verbose:
+ print(f"dry_run: would now submit {wc_label}")
next_process = None
# submitted. exit waiting line
submitted = True
@@ -291,7 +302,8 @@ def stalling(wc):
next_process_is_from_db = False
if not submitted:
- print(f"WARNING: submission of '{wc_label}' timed out after {waited_for_submit} min waiting time.")
+ if s.verbose:
+ print(f"WARNING: submission of '{wc_label}' timed out after {waited_for_submit} min waiting time.")
next_process, next_process_is_from_db = None, None
return next_process, next_process_is_from_db