From 66070e7077cb454bcfd70dd3327b335499556a16 Mon Sep 17 00:00:00 2001 From: Philipp Ruessmann Date: Thu, 25 Nov 2021 10:04:19 +0000 Subject: [PATCH] Add verbosity setting to blocking submit --- aiida_jutools/submit/blocking.py | 40 +++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/aiida_jutools/submit/blocking.py b/aiida_jutools/submit/blocking.py index f463204..b470b6d 100644 --- a/aiida_jutools/submit/blocking.py +++ b/aiida_jutools/submit/blocking.py @@ -45,6 +45,7 @@ class BlockingSubmissionControllerSettings: :param delete_if_stalling: True: delete nodes of 'stalling' top processes. Default True. :param delete_if_stalling_dry_run: True: if delete_if_stalling, simulate delete_if_stalling to 'try it out'. :param max_wait_for_stalling: delete top process (node & descendants) if running this long. To avoid congestion. + :param verbose: True: print status messages. Default True. """ dry_run: bool = True max_top_processes_running: int = 30 @@ -57,6 +58,7 @@ class BlockingSubmissionControllerSettings: delete_if_stalling: bool = False delete_if_stalling_dry_run: bool = False max_wait_for_stalling: int = 240 + verbose: bool = True class BlockingSubmissionController: @@ -117,6 +119,9 @@ def submit(self, :param groups: restrict to processes in a group or list of groups (optional) :return: tuple (next process, is process from db True or from submit False) or (None,None) if submit failed """ + # handle for settings + s = self.settings + self.__tmp_guard_against_delete_if_stalling() wc_label = builder.metadata.label @@ -150,13 +155,11 @@ def _is_duplicate(wc): workchains_not_terminated = [proc for proc in workchains if not proc.is_terminated] # created, waiting or running - if len(workchains) > 1: + if len(workchains) > 1 and s.verbose: print( f"INFO: '{wc_label}': found multiple ({len(workchains)}) results in group(s) " f"{[group.label for group in groups]}, pks: {[wc.pk for wc in workchains]}") - # handle for settings - s = self.settings seconds_per_min = 1 if s.dry_run else 60 def num_running(granularity: int): @@ -174,7 +177,8 @@ def num_running(granularity: int): # found A:B in db and finished_ok next_process_is_from_db = True next_process = _orm.load_node(workchains_finished_ok[0].pk) - print(f"loaded '{wc_label}' from db, finished_ok") + if s.verbose: + print(f"loaded '{wc_label}' from db, finished_ok") else: # not found A:B in db with state finished_ok. try submitting if workchains_terminated and not s.resubmit_failed: @@ -184,14 +188,16 @@ def num_running(granularity: int): info = f"process state {next_process.attributes['process_state']}" info = info if not next_process.attributes.get('exit_status', None) else \ info + f", exit status {next_process.attributes['exit_status']}" - print(f"loaded '{wc_label}' from db, {info}, (retry modus {s.resubmit_failed})") + if s.verbose: + print(f"loaded '{wc_label}' from db, {info}, (retry modus {s.resubmit_failed})") elif workchains_not_terminated: # found A:B in db with state not terminated, so it's currently in the queue already next_process_is_from_db = False next_process = _orm.load_node(workchains_not_terminated[0].pk) self._submitted_top_processes.append(next_process) - print(f"'{wc_label}' is not terminated") + if s.verbose: + print(f"'{wc_label}' is not terminated") else: # not found A:B in db, so never submitted yet (or deleted since) @@ -224,7 +230,8 @@ def num_running(granularity: int): f"first found previously failed ('{wc_failed_first.label}', " \ f"{wc_failed_first.description}). Will use those supplied via builder.))" info += " ..." - print(info) + if s.verbose: + print(info) submitted = False @@ -254,7 +261,8 @@ def stalling(wc): if s.delete_if_stalling_dry_run else "deleting all its nodes" info_msg = f"INFO: process pk={wc.pk} label='{wc.label}' exceeded max stalling " \ f"time {s.max_wait_for_stalling} min, {info_msg_suffix} ..." - print(info_msg) + if s.verbose: + print(info_msg) if not s.delete_if_stalling_dry_run: # note: we do not need to kill the top processnode's process first. @@ -272,18 +280,21 @@ def stalling(wc): _time.sleep(s.wait_for_submit * seconds_per_min) else: # process queue is not too full, can submit - print(f"try submit (waited {waited_for_submit} min, " - f"queued: {num_running(0)} top, {num_running(1)} all processes; " - f"wait another {s.wait_after_submit} minutes after submission)") + if s.verbose: + print(f"try submit (waited {waited_for_submit} min, " + f"queued: {num_running(0)} top, {num_running(1)} all processes; " + f"wait another {s.wait_after_submit} minutes after submission)") if not s.dry_run: next_process = _aiida_engine.submit(_builder) self._submitted_top_processes.append(next_process) for group in groups: group.add_nodes([next_process]) - print(f"submitted {wc_label}, pk {next_process.pk}") + if s.verbose: + print(f"submitted {wc_label}, pk {next_process.pk}") _time.sleep(s.wait_after_submit * seconds_per_min) else: - print(f"dry_run: would now submit {wc_label}") + if s.verbose: + print(f"dry_run: would now submit {wc_label}") next_process = None # submitted. exit waiting line submitted = True @@ -291,7 +302,8 @@ def stalling(wc): next_process_is_from_db = False if not submitted: - print(f"WARNING: submission of '{wc_label}' timed out after {waited_for_submit} min waiting time.") + if s.verbose: + print(f"WARNING: submission of '{wc_label}' timed out after {waited_for_submit} min waiting time.") next_process, next_process_is_from_db = None, None return next_process, next_process_is_from_db