From d0e4777dc72d102eeaa9e7dbc13a06f406ae7f2c Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Mon, 4 Nov 2024 13:50:41 +0100 Subject: [PATCH] Reset `tasks.py` and restrict `transports.py` changes to `last_close_time`. --- src/aiida/engine/processes/calcjobs/tasks.py | 36 ++-------- src/aiida/engine/transports.py | 75 +++----------------- 2 files changed, 14 insertions(+), 97 deletions(-) diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 58647f4a70..2c030ba878 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -54,6 +54,7 @@ class PreSubmitException(Exception): # noqa: N818 """Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`.""" + async def task_upload_job(process: 'CalcJob', transport_queue: TransportQueue, cancellable: InterruptableFuture): """Transport task that will attempt to upload the files of a job calculation to the remote. @@ -140,36 +141,11 @@ async def task_submit_job(node: CalcJobNode, transport_queue: TransportQueue, ca max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) authinfo = node.get_authinfo() - # authinfo_pk = authinfo.pk - - # transport_request = transport_queue._transport_requests.get(authinfo.pk, None) - # open_transport = transport_queue._open_transports.get(authinfo.pk, None) - - # if open_transport is not None: # and not transport_queue._last_request_special: - # transport = open_transport - # transport_queue._last_request_special = True - # elif transport_request is None: # or transport_queue._last_request_special: - # # This is the previous behavior - # with transport_queue.request_transport(authinfo) as request: - # transport = await cancellable.with_interrupt(request) - # else: - # pass - - async def do_submit(): - transport_request = transport_queue._transport_requests.get(authinfo.pk, None) - open_transport = transport_queue._open_transports.get(authinfo.pk, None) - - if open_transport is not None: # and not transport_queue._last_request_special: - transport = open_transport - # transport_queue._last_request_special = True - elif transport_request is None: # or transport_queue._last_request_special: - # This is the previous behavior - with transport_queue.request_transport(authinfo) as request: - transport = await cancellable.with_interrupt(request) - else: - pass - return execmanager.submit_calculation(node, transport) + async def do_submit(): + with transport_queue.request_transport(authinfo) as request: + transport = await cancellable.with_interrupt(request) + return execmanager.submit_calculation(node, transport) try: logger.info(f'scheduled request to submit CalcJob<{node.pk}>') @@ -520,11 +496,9 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override result: plumpy.process_states.State = self process_status = f'Waiting for transport task: {self._command}' - node.set_process_status(process_status) try: - # ? Possibly implement here to keep connection open if self._command == UPLOAD_COMMAND: skip_submit = await self._launch_task(task_upload_job, self.process, transport_queue) if skip_submit: diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 4efedd84ff..f5358489ea 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -54,12 +54,7 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): """:param loop: An asyncio event, will use `asyncio.get_event_loop()` if not supplied""" self._loop = loop if loop is not None else asyncio.get_event_loop() self._transport_requests: Dict[Hashable, TransportRequest] = {} - self._open_transports: Dict[Hashable, Transport] = {} - self._last_open_time = None self._last_close_time = None - self._last_request_special: bool = False - self._close_callback_handle = None - # self._last_transport_request: Dict[Hashable, str] = {} @property def loop(self) -> asyncio.AbstractEventLoop: @@ -81,10 +76,7 @@ async def transport_task(transport_queue, authinfo): :return: A future that can be yielded to give the transport """ open_callback_handle = None - close_callback_handle = None transport_request = self._transport_requests.get(authinfo.pk, None) - # safe_open_interval = transport.get_safe_open_interval() - safe_open_interval = 30 if transport_request is None: # There is no existing request for this transport (i.e. on this authinfo) @@ -92,9 +84,8 @@ async def transport_task(transport_queue, authinfo): self._transport_requests[authinfo.pk] = transport_request transport = authinfo.get_transport() + safe_open_interval = transport.get_safe_open_interval() - # Check here if last_open_time > safe_interval, one could immediately open the transport - # This should be the very first request, after a while def do_open(): """Actually open the transport""" if transport_request.count > 0: @@ -102,8 +93,6 @@ def do_open(): _LOGGER.debug('Transport request opening transport for %s', authinfo) try: transport.open() - self._last_open_time = timezone.localtime(timezone.now()) - self._open_transports[authinfo.pk] = transport except Exception as exception: _LOGGER.error('exception occurred while trying to open transport:\n %s', exception) transport_request.future.set_exception(exception) @@ -120,37 +109,20 @@ def do_open(): # See https://github.com/aiidateam/aiida-core/issues/4698 # First request, submit immediately - # ? Are these attributes persistet, or is a new TransportQueue instance created for every transport task? - if self._last_request_special: - open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - self._last_request_special = False - - elif self._last_close_time is None: + if self._last_close_time is None: open_callback_handle = self._loop.call_soon(do_open, context=contextvars.Context()) - self._last_request_special = True else: close_timedelta = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds() - open_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() - - if open_timedelta > safe_open_interval: - # ! This could also be `_loop.call_soon` which has an implicit delay of 0s - # open_timedelta = close_timedelta-safe_open_interval + if close_timedelta > safe_open_interval: + # If time since last close > `safe_open_interval`, open immediately open_callback_handle = self._loop.call_soon(do_open, context=contextvars.Context()) - self._last_request_special = True else: - # If the last one was a special request, wait the difference between safe_open_interval and lost - open_callback_handle = self._loop.call_later(safe_open_interval-open_timedelta, do_open, context=contextvars.Context()) - - # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + # Otherwise, wait only the difference required until the `safe_open_interval` is reached + open_callback_handle = self._loop.call_later(safe_open_interval-close_timedelta, do_open, context=contextvars.Context()) - # ? This logic is implemented in `tasks.py` instead. - # else: - # transport = authinfo.get_transport() - # return transport - # If transport_request is open already try: transport_request.count += 1 yield transport_request.future @@ -169,39 +141,10 @@ def do_open(): if transport_request.count == 0: if transport_request.future.done(): - # ? Why is all this logic in the `request_transport` method? - # ? Shouldn't the logic to close a transport be outside, such that the transport is being closed - # ? once it was actually used??? - pass - # def do_close(): - # """Close the transport if conditions are met.""" - # transport_request.future.result().close() - # self._last_close_time = timezone.localtime(timezone.now()) - - # close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() - - # if close_timedelta < safe_open_interval: - - # Also here logic when transport should be closed immediately, or when via call_later? - # self._last_close_time = timezone.localtime(timezone.now()) - # self._transport_requests.pop(authinfo.pk, None) - # close_callback_handle = self._loop.call_later(safe_open_interval, do_close, context=contextvars.Context()) - # if close_timedelta > safe_open_interval: - # close_callback_handle = self._loop.call_soon(do_close, context=contextvars.Context()) - # self._last_close_time = timezone.localtime(timezone.now()) - # self._transport_requests.pop(authinfo.pk, None) - # self._transport_requests.pop(authinfo.pk, None) - - # transport_request.transport_closer = close_callback_handle - - # This should be replaced with the call_later close_callback_handle invocation - # transport_request.future.result().close() - # ? When should the transport_request be popped? - # ? If it is always popped as soon as the task is done, there is no way to re-use it... - # self._transport_requests.pop(authinfo.pk, None) + _LOGGER.debug('Transport request closing transport for %s', authinfo) + transport_request.future.result().close() elif open_callback_handle is not None: open_callback_handle.cancel() - # ? Somewhere I still need to `pop` the transport_request... or do I? - # self._transport_requests.pop(authinfo.pk, None) + self._transport_requests.pop(authinfo.pk, None)