Skip to content

Commit

Permalink
Clean up from stash.
Browse files Browse the repository at this point in the history
  • Loading branch information
GeigerJ2 committed Oct 28, 2024
1 parent 938f2cd commit d2220d3
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 131 deletions.
8 changes: 0 additions & 8 deletions src/aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@
logger = logging.getLogger(__name__)


## def log_to_file(message):
# current_time = datetime.now().strftime("%H:%M:%S")
# with open("/home/geiger_j/aiida_projects/aiida-dev/ssh-alive-testing/transport-log.txt", "a") as f:
# f.write(f"{current_time}: {message}(tasks.py)\n")


class PreSubmitException(Exception): # noqa: N818
"""Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`."""

Expand Down Expand Up @@ -503,8 +497,6 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override

process_status = f'Waiting for transport task: {self._command}'

# ## log_to_file(f'TRANSPORT_QUEUE: {transport_queue}')

node.set_process_status(process_status)

try:
Expand Down
160 changes: 37 additions & 123 deletions src/aiida/engine/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@

_LOGGER = logging.getLogger(__name__)

# Open and append to the log file at different points

# Open and append to the log file, prepending the current time
## def log_to_file(message):
# datetime.now()
# current_time = datetime.now().strftime("%H:%M:%S")
# with open("/home/geiger_j/aiida_projects/aiida-dev/ssh-alive-testing/transport-log.txt", "a") as f:
# f.write(f"{current_time}: {message} (transports.py)\n")


class TransportRequest:
"""Information kept about request for a transport object"""
Expand All @@ -41,7 +32,8 @@ def __init__(self):
super().__init__()
self.future: asyncio.Future = asyncio.Future()
self.count = 0
self.transport_closer = None
# ? What do I need this for?
# self.transport_closer = None


class TransportQueue:
Expand All @@ -60,10 +52,10 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None):
self._loop = loop if loop is not None else asyncio.get_event_loop()
self._transport_requests: Dict[Hashable, TransportRequest] = {}
self._last_open_time = None
# self._last_submission_time = None
self._last_close_time = None
self._last_request_special: bool = False
# self._last_submission_time = None
# self._last_transport_request: Dict[Hashable, str] = {}
# self._was_last_request_special: bool = False

@property
def loop(self) -> asyncio.AbstractEventLoop:
Expand All @@ -85,11 +77,8 @@ async def transport_task(transport_queue, authinfo):
:return: A future that can be yielded to give the transport
"""
open_callback_handle = None
close_callback_handle = None
# close_callback_handle = None
transport_request = self._transport_requests.get(authinfo.pk, None)
# ## log_to_file(f'EventLoop: {asyncio.all_tasks(self.loop)}')

# ## log_to_file(f'transport_request: {transport_request}')

if transport_request is None:
# There is no existing request for this transport (i.e. on this authinfo)
Expand All @@ -102,7 +91,6 @@ async def transport_task(transport_queue, authinfo):

# Check here if last_open_time > safe_interval, one could immediately open the transport
# This should be the very first request, after a while
## log_to_file(f'OPEN_CALLBACK_HANDLE BEFORE DO_OPEN: {open_callback_handle}')
def do_open():
"""Actually open the transport"""
if transport_request.count > 0:
Expand All @@ -111,7 +99,6 @@ def do_open():
try:
transport.open()
self._last_open_time = datetime.now()
## log_to_file(f'LAST_OPEN_TIME: {self._last_open_time}')
except Exception as exception:
_LOGGER.error('exception occurred while trying to open transport:\n %s', exception)
transport_request.future.set_exception(exception)
Expand All @@ -127,87 +114,45 @@ def do_open():
# to this handle would otherwise keep the Process context (and thus the process itself) in memory.
# See https://github.com/aiidateam/aiida-core/issues/4698

# Pseudocode
# get_metadata from authinfo
# see if there is a last_close, if not None, compute seconds from then to now
# if < safe_interval, wait for difference
# if larger, call call_later, open call_later but with 0
metadata = authinfo.get_metadata()
last_close_time = metadata.get('last_close_time')

log_file_path = '/home/geiger_j/aiida_projects/thor-dev/transport-log.txt'
# if self._last_request_special:
# open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context())
# self._last_request_special = False

debug_info = '\nDEBUG START\n'

# if last_close_time is None:
# # Submit immediately -> This is not ever being triggered
# # First request, submit immediately
# # ? Are these attributes persistet, or is a new TransportQueue instance created for every transport task?
# if self._last_close_time is None:
# open_callback_handle = self._loop.call_later(1, do_open, context=contextvars.Context())
# self._last_request_special = True

# debug_info += (
# f"LAST_CLOSE_TIME_NONE: submit immediately\n"
# f"LAST_CLOSE_TIME: {last_close_time}\n"
# f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n"
# )

last_close_time = datetime.strptime(last_close_time, '%Y-%m-%dT%H:%M:%S.%f%z')
timedelta_seconds = (timezone.localtime(timezone.now()) - last_close_time).total_seconds()

if timedelta_seconds > safe_open_interval:
debug_info += (
f'TIMEDELTA > SAFE_OPEN_INTERVAL: submit immediately\n' f'LAST_CLOSE_TIME: {last_close_time}\n'
# f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n"
)
# # self._last_close_time = datetime.strptime(self._last_close_time, '%Y-%m-%dT%H:%M:%S.%f%z')
# else:
# timedelta_seconds = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds()

open_callback_handle = self._loop.call_later(0, do_open, context=contextvars.Context())
# self._was_last_request_special = True
# if timedelta_seconds > safe_open_interval:
# # ! This could also be `_loop.call_soon` which has an implicit delay of 0s

else:
# If the last one was a special request, wait the safe_open_interval
debug_info += (
f'TIMEDELTA < SAFE_OPEN_INTERVAL and last request special: submit after safe_open_interval\n'
f'LAST_CLOSE_TIME: {last_close_time}\n'
# f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n"
)
# open_callback_handle = self._loop.call_later(timedelta_seconds-safe_open_interval, do_open, context=contextvars.Context())
# self._last_request_special = True

open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context())
# else:
# # If the last one was a special request, wait the safe_open_interval
# open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context())

# if self._was_last_request_special:

# # If the last one was a special request, wait the safe_open_interval
# debug_info += (
# f"TIMEDELTA < SAFE_OPEN_INTERVAL and last request special: submit after safe_open_interval\n"
# f"LAST_CLOSE_TIME: {last_close_time}\n"
# f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n"
# )
# if self._last_request_special:

# open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context())
# self._was_last_request_special = False
# self._last_request_special = False

# else:

# # This is also a special request
# # Or, should it be? Could also remove the if/else, and just wait the safe_open_interval, as is the default
# debug_info += (
# f"TIMEDELTA < SAFE_OPEN_INTERVAL and last request not special: submit after timedelta\n"
# f"LAST_CLOSE_TIME: {last_close_time}\n"
# f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n"
# f"TIMEDELTA: {timedelta_seconds}\n"
# )

# open_callback_handle = self._loop.call_later(timedelta_seconds, do_open, context=contextvars.Context())
# self._was_last_request_special = True
# self._last_request_special = True
# open_callback_handle = self._loop.call_later(5, do_open, context=contextvars.Context())

# open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context())

with open(log_file_path, 'a') as log_file:
log_file.write(debug_info)

try:
transport_request.count += 1
self._last_submission_time = datetime.now()
## log_to_file(f'LAST_SUBMISSION_TIME: {self._last_submission_time}')
## log_to_file(f'TRANSPORT_REQUEST_COUNT: {transport_request.count}')
yield transport_request.future
except asyncio.CancelledError:
# note this is only required in python<=3.7,
Expand All @@ -218,60 +163,29 @@ def do_open():
_LOGGER.error('Exception whilst using transport:\n%s', traceback.format_exc())
raise
finally:
## log_to_file(f'FINALLY BLOCK - TRANSPORT_REQUEST.COUNT: {transport_request.count}')
transport_request.count -= 1
assert transport_request.count >= 0, 'Transport request count dropped below 0!'
# Check if there are no longer any users that want the transport
if transport_request.count == 0:
## log_to_file(f'TRANSPORT_REQUEST.FUTURE.DONE(): {transport_request.future.done()}')
if transport_request.future.done():
## log_to_file(f'DATETIME: {(datetime.now() - self._last_open_time).total_seconds() > 5}')

if (datetime.now() - self._last_open_time).total_seconds() > 5:

def close_transport():
transport_request.future.result().close()
"""Close the transport if conditions are met."""
## log_to_file("CLOSE_TRANSPORT")
## log_to_file(f"LAST_CLOSE_TIME: {self._last_close_time}")

close_callback_handle = self._loop.call_later(5, close_transport, context=contextvars.Context())
transport_request.transport_closer = close_callback_handle
# transport_request.transport_closer = None
# else:
# If not yet time to close, schedule again
# close_callback_handle = self._loop.call_later(
# 5, close_transport, context=contextvars.Context())
# transport_request.transport_closer = close_callback_handle

# ## log_to_file(f"TRANSPORT_REQUEST.TRANSPORT_CLOSER: {transport_request.transport_closer}")
# if transport_request.transport_closer is None:
# ## log_to_file("INSIDE")
# self._last_close_time = datetime.now()
# else:
# return
# if (datetime.now() - self._last_open_time).total_seconds() > 5:

# This should be replaced with the call_later close_callback_handle invocation
# ## log_to_file(f"TRANSPORT_REQUEST.TRANSPORT_CLOSER: {transport_request.transport_closer}")
# def close_transport():
# """Close the transport if conditions are met."""
# transport_request.future.result().close()

transport_request.future.result().close()
# Also here logic when transport should be closed immediately, or when via call_later?
# close_callback_handle = self._loop.call_later(5, close_transport, context=contextvars.Context())
# transport_request.transport_closer = close_callback_handle

# Get old metadata from authinfo, and set variable last_close_time to datetime now in isoformat
# Need to convert to str, otherwise not json-serializable when setting authinfo metadata
# if self._was_last_request_special is True:
last_close_time = timezone.localtime(timezone.now()).strftime('%Y-%m-%dT%H:%M:%S.%f%z')
authinfo.set_metadata({'last_close_time': last_close_time})
# else:
# # asyncio.sleep(5)
# transport_request.count += 1
# self._was_last_request_special = True
# yield transport_request.future
# This should be replaced with the call_later close_callback_handle invocation
transport_request.future.result().close()
# When storing in `AuthInfo`, had to convert to str to be storeable in the DB
# self._last_close_time = timezone.localtime(timezone.now()).strftime('%Y-%m-%dT%H:%M:%S.%f%z')
self._last_close_time = timezone.localtime(timezone.now())

elif open_callback_handle is not None:
open_callback_handle.cancel()

self._transport_requests.pop(authinfo.pk, None)


# Should wait first time 0, then always ~30
# Try out with manual waiting times in between

0 comments on commit d2220d3

Please sign in to comment.