diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index cc36d8415c..b1ea8c482c 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -51,12 +51,6 @@ logger = logging.getLogger(__name__) -## def log_to_file(message): -# current_time = datetime.now().strftime("%H:%M:%S") -# with open("/home/geiger_j/aiida_projects/aiida-dev/ssh-alive-testing/transport-log.txt", "a") as f: -# f.write(f"{current_time}: {message}(tasks.py)\n") - - class PreSubmitException(Exception): # noqa: N818 """Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`.""" @@ -503,8 +497,6 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override process_status = f'Waiting for transport task: {self._command}' - # ## log_to_file(f'TRANSPORT_QUEUE: {transport_queue}') - node.set_process_status(process_status) try: diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 6bc796a7d4..56baeb9237 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -24,15 +24,6 @@ _LOGGER = logging.getLogger(__name__) -# Open and append to the log file at different points - -# Open and append to the log file, prepending the current time -## def log_to_file(message): -# datetime.now() -# current_time = datetime.now().strftime("%H:%M:%S") -# with open("/home/geiger_j/aiida_projects/aiida-dev/ssh-alive-testing/transport-log.txt", "a") as f: -# f.write(f"{current_time}: {message} (transports.py)\n") - class TransportRequest: """Information kept about request for a transport object""" @@ -41,7 +32,8 @@ def __init__(self): super().__init__() self.future: asyncio.Future = asyncio.Future() self.count = 0 - self.transport_closer = None + # ? What do I need this for? + # self.transport_closer = None class TransportQueue: @@ -60,10 +52,10 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): self._loop = loop if loop is not None else asyncio.get_event_loop() self._transport_requests: Dict[Hashable, TransportRequest] = {} self._last_open_time = None - # self._last_submission_time = None self._last_close_time = None + self._last_request_special: bool = False + # self._last_submission_time = None # self._last_transport_request: Dict[Hashable, str] = {} - # self._was_last_request_special: bool = False @property def loop(self) -> asyncio.AbstractEventLoop: @@ -85,11 +77,8 @@ async def transport_task(transport_queue, authinfo): :return: A future that can be yielded to give the transport """ open_callback_handle = None - close_callback_handle = None + # close_callback_handle = None transport_request = self._transport_requests.get(authinfo.pk, None) - # ## log_to_file(f'EventLoop: {asyncio.all_tasks(self.loop)}') - - # ## log_to_file(f'transport_request: {transport_request}') if transport_request is None: # There is no existing request for this transport (i.e. on this authinfo) @@ -102,7 +91,6 @@ async def transport_task(transport_queue, authinfo): # Check here if last_open_time > safe_interval, one could immediately open the transport # This should be the very first request, after a while - ## log_to_file(f'OPEN_CALLBACK_HANDLE BEFORE DO_OPEN: {open_callback_handle}') def do_open(): """Actually open the transport""" if transport_request.count > 0: @@ -111,7 +99,6 @@ def do_open(): try: transport.open() self._last_open_time = datetime.now() - ## log_to_file(f'LAST_OPEN_TIME: {self._last_open_time}') except Exception as exception: _LOGGER.error('exception occurred while trying to open transport:\n %s', exception) transport_request.future.set_exception(exception) @@ -127,87 +114,45 @@ def do_open(): # to this handle would otherwise keep the Process context (and thus the process itself) in memory. # See https://github.com/aiidateam/aiida-core/issues/4698 - # Pseudocode - # get_metadata from authinfo - # see if there is a last_close, if not None, compute seconds from then to now - # if < safe_interval, wait for difference - # if larger, call call_later, open call_later but with 0 - metadata = authinfo.get_metadata() - last_close_time = metadata.get('last_close_time') - - log_file_path = '/home/geiger_j/aiida_projects/thor-dev/transport-log.txt' + # if self._last_request_special: + # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + # self._last_request_special = False - debug_info = '\nDEBUG START\n' - - # if last_close_time is None: - # # Submit immediately -> This is not ever being triggered + # # First request, submit immediately + # # ? Are these attributes persistet, or is a new TransportQueue instance created for every transport task? + # if self._last_close_time is None: # open_callback_handle = self._loop.call_later(1, do_open, context=contextvars.Context()) + # self._last_request_special = True - # debug_info += ( - # f"LAST_CLOSE_TIME_NONE: submit immediately\n" - # f"LAST_CLOSE_TIME: {last_close_time}\n" - # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" - # ) - - last_close_time = datetime.strptime(last_close_time, '%Y-%m-%dT%H:%M:%S.%f%z') - timedelta_seconds = (timezone.localtime(timezone.now()) - last_close_time).total_seconds() - - if timedelta_seconds > safe_open_interval: - debug_info += ( - f'TIMEDELTA > SAFE_OPEN_INTERVAL: submit immediately\n' f'LAST_CLOSE_TIME: {last_close_time}\n' - # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" - ) + # # self._last_close_time = datetime.strptime(self._last_close_time, '%Y-%m-%dT%H:%M:%S.%f%z') + # else: + # timedelta_seconds = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds() - open_callback_handle = self._loop.call_later(0, do_open, context=contextvars.Context()) - # self._was_last_request_special = True + # if timedelta_seconds > safe_open_interval: + # # ! This could also be `_loop.call_soon` which has an implicit delay of 0s - else: - # If the last one was a special request, wait the safe_open_interval - debug_info += ( - f'TIMEDELTA < SAFE_OPEN_INTERVAL and last request special: submit after safe_open_interval\n' - f'LAST_CLOSE_TIME: {last_close_time}\n' - # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" - ) + # open_callback_handle = self._loop.call_later(timedelta_seconds-safe_open_interval, do_open, context=contextvars.Context()) + # self._last_request_special = True - open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + # else: + # # If the last one was a special request, wait the safe_open_interval + # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - # if self._was_last_request_special: - - # # If the last one was a special request, wait the safe_open_interval - # debug_info += ( - # f"TIMEDELTA < SAFE_OPEN_INTERVAL and last request special: submit after safe_open_interval\n" - # f"LAST_CLOSE_TIME: {last_close_time}\n" - # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" - # ) + # if self._last_request_special: # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - # self._was_last_request_special = False + # self._last_request_special = False # else: - - # # This is also a special request - # # Or, should it be? Could also remove the if/else, and just wait the safe_open_interval, as is the default - # debug_info += ( - # f"TIMEDELTA < SAFE_OPEN_INTERVAL and last request not special: submit after timedelta\n" - # f"LAST_CLOSE_TIME: {last_close_time}\n" - # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" - # f"TIMEDELTA: {timedelta_seconds}\n" - # ) - # open_callback_handle = self._loop.call_later(timedelta_seconds, do_open, context=contextvars.Context()) - # self._was_last_request_special = True + # self._last_request_special = True # open_callback_handle = self._loop.call_later(5, do_open, context=contextvars.Context()) # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - with open(log_file_path, 'a') as log_file: - log_file.write(debug_info) - try: transport_request.count += 1 self._last_submission_time = datetime.now() - ## log_to_file(f'LAST_SUBMISSION_TIME: {self._last_submission_time}') - ## log_to_file(f'TRANSPORT_REQUEST_COUNT: {transport_request.count}') yield transport_request.future except asyncio.CancelledError: # note this is only required in python<=3.7, @@ -218,60 +163,29 @@ def do_open(): _LOGGER.error('Exception whilst using transport:\n%s', traceback.format_exc()) raise finally: - ## log_to_file(f'FINALLY BLOCK - TRANSPORT_REQUEST.COUNT: {transport_request.count}') transport_request.count -= 1 assert transport_request.count >= 0, 'Transport request count dropped below 0!' # Check if there are no longer any users that want the transport if transport_request.count == 0: - ## log_to_file(f'TRANSPORT_REQUEST.FUTURE.DONE(): {transport_request.future.done()}') if transport_request.future.done(): - ## log_to_file(f'DATETIME: {(datetime.now() - self._last_open_time).total_seconds() > 5}') - - if (datetime.now() - self._last_open_time).total_seconds() > 5: - - def close_transport(): - transport_request.future.result().close() - """Close the transport if conditions are met.""" - ## log_to_file("CLOSE_TRANSPORT") - ## log_to_file(f"LAST_CLOSE_TIME: {self._last_close_time}") - - close_callback_handle = self._loop.call_later(5, close_transport, context=contextvars.Context()) - transport_request.transport_closer = close_callback_handle - # transport_request.transport_closer = None - # else: - # If not yet time to close, schedule again - # close_callback_handle = self._loop.call_later( - # 5, close_transport, context=contextvars.Context()) - # transport_request.transport_closer = close_callback_handle - # ## log_to_file(f"TRANSPORT_REQUEST.TRANSPORT_CLOSER: {transport_request.transport_closer}") - # if transport_request.transport_closer is None: - # ## log_to_file("INSIDE") - # self._last_close_time = datetime.now() - # else: - # return + # if (datetime.now() - self._last_open_time).total_seconds() > 5: - # This should be replaced with the call_later close_callback_handle invocation - # ## log_to_file(f"TRANSPORT_REQUEST.TRANSPORT_CLOSER: {transport_request.transport_closer}") + # def close_transport(): + # """Close the transport if conditions are met.""" + # transport_request.future.result().close() - transport_request.future.result().close() + # Also here logic when transport should be closed immediately, or when via call_later? + # close_callback_handle = self._loop.call_later(5, close_transport, context=contextvars.Context()) + # transport_request.transport_closer = close_callback_handle - # Get old metadata from authinfo, and set variable last_close_time to datetime now in isoformat - # Need to convert to str, otherwise not json-serializable when setting authinfo metadata - # if self._was_last_request_special is True: - last_close_time = timezone.localtime(timezone.now()).strftime('%Y-%m-%dT%H:%M:%S.%f%z') - authinfo.set_metadata({'last_close_time': last_close_time}) - # else: - # # asyncio.sleep(5) - # transport_request.count += 1 - # self._was_last_request_special = True - # yield transport_request.future + # This should be replaced with the call_later close_callback_handle invocation + transport_request.future.result().close() + # When storing in `AuthInfo`, had to convert to str to be storeable in the DB + # self._last_close_time = timezone.localtime(timezone.now()).strftime('%Y-%m-%dT%H:%M:%S.%f%z') + self._last_close_time = timezone.localtime(timezone.now()) elif open_callback_handle is not None: open_callback_handle.cancel() self._transport_requests.pop(authinfo.pk, None) - - -# Should wait first time 0, then always ~30 -# Try out with manual waiting times in between