diff --git a/src/aiida/brokers/broker.py b/src/aiida/brokers/broker.py index df8e628a2..941c69833 100644 --- a/src/aiida/brokers/broker.py +++ b/src/aiida/brokers/broker.py @@ -3,8 +3,10 @@ import abc import typing as t + if t.TYPE_CHECKING: from aiida.manage.configuration.profile import Profile + from plumpy.coordinator import Coordinator __all__ = ('Broker',) @@ -20,11 +22,7 @@ def __init__(self, profile: 'Profile') -> None: self._profile = profile @abc.abstractmethod - def get_communicator(self): - """Return an instance of :class:`kiwipy.Communicator`.""" - - @abc.abstractmethod - def get_coordinator(self): + def get_coordinator(self) -> 'Coordinator': """Return an instance of coordinator.""" @abc.abstractmethod diff --git a/src/aiida/brokers/rabbitmq/broker.py b/src/aiida/brokers/rabbitmq/broker.py index 8096747be..370afc6ac 100644 --- a/src/aiida/brokers/rabbitmq/broker.py +++ b/src/aiida/brokers/rabbitmq/broker.py @@ -31,7 +31,7 @@ def __init__(self, profile: Profile) -> None: :param profile: The profile. """ self._profile = profile - self._communicator: 'RmqThreadCommunicator' | None = None + self._communicator: 'RmqThreadCommunicator | None' = None self._prefix = f'aiida-{self._profile.uuid}' def __str__(self): @@ -48,19 +48,16 @@ def close(self): def iterate_tasks(self): """Return an iterator over the tasks in the launch queue.""" - for task in self.get_communicator().task_queue(get_launch_queue_name(self._prefix)): + for task in self.get_coordinator().communicator.task_queue(get_launch_queue_name(self._prefix)): yield task - def get_communicator(self) -> 'RmqThreadCommunicator': + def get_coordinator(self): if self._communicator is None: self._communicator = self._create_communicator() # Check whether a compatible version of RabbitMQ is being used. self.check_rabbitmq_version() - return self._communicator - - def get_coordinator(self): - coordinator = RmqCoordinator(self.get_communicator()) + coordinator = RmqCoordinator(self._communicator) return coordinator @@ -70,7 +67,7 @@ def _create_communicator(self) -> 'RmqThreadCommunicator': from aiida.orm.utils import serialize - self._communicator = RmqThreadCommunicator.connect( + _communicator = RmqThreadCommunicator.connect( connection_params={'url': self.get_url()}, message_exchange=get_message_exchange_name(self._prefix), encoder=functools.partial(serialize.serialize, encoding='utf-8'), @@ -84,7 +81,7 @@ def _create_communicator(self) -> 'RmqThreadCommunicator': testing_mode=self._profile.is_test_profile, ) - return self._communicator + return _communicator def check_rabbitmq_version(self): """Check the version of RabbitMQ that is being connected to and emit warning if it is not compatible.""" @@ -128,4 +125,4 @@ def get_rabbitmq_version(self): """ from packaging.version import parse - return parse(self.get_communicator().server_properties['version']) + return parse(self.get_coordinator().communicator.server_properties['version']) diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index 9f8ae1646..a4e665a54 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -10,6 +10,7 @@ import click +from aiida.brokers.broker import Broker from aiida.cmdline.commands.cmd_verdi import verdi from aiida.cmdline.params import arguments, options, types from aiida.cmdline.utils import decorators, echo @@ -416,7 +417,7 @@ def process_play(processes, all_entries, timeout, wait): @decorators.with_dbenv() @decorators.with_broker @decorators.only_if_daemon_running(echo.echo_warning, 'daemon is not running, so process may not be reachable') -def process_watch(broker, processes, most_recent_node): +def process_watch(broker: Broker, processes, most_recent_node): """Watch the state transitions of processes. Watch the state transitions for one or multiple running processes.""" @@ -436,7 +437,7 @@ def process_watch(broker, processes, most_recent_node): from kiwipy import BroadcastFilter - def _print(communicator, body, sender, subject, correlation_id): + def _print(coordinator, body, sender, subject, correlation_id): """Format the incoming broadcast data into a message and echo it to stdout.""" if body is None: body = 'No message specified' @@ -446,7 +447,7 @@ def _print(communicator, body, sender, subject, correlation_id): echo.echo(f'Process<{sender}> [{subject}|{correlation_id}]: {body}') - communicator = broker.get_communicator() + coordinator = broker.get_coordinator() echo.echo_report('watching for broadcasted messages, press CTRL+C to stop...') if most_recent_node: @@ -457,7 +458,7 @@ def _print(communicator, body, sender, subject, correlation_id): echo.echo_error(f'Process<{process.pk}> is already terminated') continue - communicator.add_broadcast_subscriber(BroadcastFilter(_print, sender=process.pk)) + coordinator.add_broadcast_subscriber(BroadcastFilter(_print, sender=process.pk)) try: # Block this thread indefinitely until interrupt @@ -467,7 +468,7 @@ def _print(communicator, body, sender, subject, correlation_id): echo.echo('') # add a new line after the interrupt character echo.echo_report('received interrupt, exiting...') try: - communicator.close() + coordinator.close() except RuntimeError: pass diff --git a/src/aiida/cmdline/commands/cmd_rabbitmq.py b/src/aiida/cmdline/commands/cmd_rabbitmq.py index c6a66d6da..f97d22742 100644 --- a/src/aiida/cmdline/commands/cmd_rabbitmq.py +++ b/src/aiida/cmdline/commands/cmd_rabbitmq.py @@ -20,6 +20,7 @@ from aiida.cmdline.commands.cmd_devel import verdi_devel from aiida.cmdline.params import arguments, options from aiida.cmdline.utils import decorators, echo, echo_tabulate +from aiida.manage.manager import Manager if t.TYPE_CHECKING: import requests @@ -131,7 +132,7 @@ def with_client(ctx, wrapped, _, args, kwargs): @cmd_rabbitmq.command('server-properties') @decorators.with_manager -def cmd_server_properties(manager): +def cmd_server_properties(manager: Manager): """List the server properties.""" import yaml diff --git a/src/aiida/cmdline/commands/cmd_status.py b/src/aiida/cmdline/commands/cmd_status.py index 85ef292fa..6ee1952fb 100644 --- a/src/aiida/cmdline/commands/cmd_status.py +++ b/src/aiida/cmdline/commands/cmd_status.py @@ -132,7 +132,7 @@ def verdi_status(print_traceback, no_rmq): if broker: try: - broker.get_communicator() + broker.get_coordinator() except Exception as exc: message = f'Unable to connect to broker: {broker}' print_status(ServiceStatus.ERROR, 'broker', message, exception=exc, print_traceback=print_traceback) diff --git a/src/aiida/engine/processes/futures.py b/src/aiida/engine/processes/futures.py index 096c11b27..79016dacc 100644 --- a/src/aiida/engine/processes/futures.py +++ b/src/aiida/engine/processes/futures.py @@ -12,6 +12,7 @@ from typing import Optional, Union import kiwipy +from plumpy.coordinator import Coordinator from aiida.orm import Node, load_node @@ -28,17 +29,17 @@ def __init__( pk: int, loop: Optional[asyncio.AbstractEventLoop] = None, poll_interval: Union[None, int, float] = None, - communicator: Optional[kiwipy.Communicator] = None, + coordinator: Optional[Coordinator] = None, ): """Construct a future for a process node being finished. If a None poll_interval is supplied polling will not be used. - If a communicator is supplied it will be used to listen for broadcast messages. + If a coordinator is supplied it will be used to listen for broadcast messages. :param pk: process pk :param loop: An event loop :param poll_interval: optional polling interval, if None, polling is not activated. - :param communicator: optional communicator, if None, will not subscribe to broadcasts. + :param coordinator: optional coordinator, if None, will not subscribe to broadcasts. """ from .process import ProcessState @@ -46,18 +47,18 @@ def __init__( loop = loop if loop is not None else asyncio.get_event_loop() super().__init__(loop=loop) - assert not (poll_interval is None and communicator is None), 'Must poll or have a communicator to use' + assert not (poll_interval is None and coordinator is None), 'Must poll or have a coordinator to use' node = load_node(pk=pk) if node.is_terminated: self.set_result(node) else: - self._communicator = communicator + self._coordinator = coordinator self.add_done_callback(lambda _: self.cleanup()) # Try setting up a filtered broadcast subscriber - if self._communicator is not None: + if self._coordinator is not None: def _subscriber(*args, **kwargs): if not self.done(): @@ -66,17 +67,17 @@ def _subscriber(*args, **kwargs): broadcast_filter = kiwipy.BroadcastFilter(_subscriber, sender=pk) for state in [ProcessState.FINISHED, ProcessState.KILLED, ProcessState.EXCEPTED]: broadcast_filter.add_subject_filter(f'state_changed.*.{state.value}') - self._broadcast_identifier = self._communicator.add_broadcast_subscriber(broadcast_filter) + self._broadcast_identifier = self._coordinator.add_broadcast_subscriber(broadcast_filter) # Start polling if poll_interval is not None: loop.create_task(self._poll_process(node, poll_interval)) def cleanup(self) -> None: - """Clean up the future by removing broadcast subscribers from the communicator if it still exists.""" - if self._communicator is not None: - self._communicator.remove_broadcast_subscriber(self._broadcast_identifier) - self._communicator = None + """Clean up the future by removing broadcast subscribers from the coordinator if it still exists.""" + if self._coordinator is not None: + self._coordinator.remove_broadcast_subscriber(self._broadcast_identifier) + self._coordinator = None self._broadcast_identifier = None async def _poll_process(self, node: Node, poll_interval: Union[int, float]) -> None: diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index 6cfc7153a..916589ccf 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -326,24 +326,6 @@ def get_persister(self) -> 'AiiDAPersister': return self._persister - def get_communicator(self) -> 'RmqThreadCommunicator': - """Return the communicator - - :return: a global communicator instance - - """ - from aiida.common import ConfigurationError - - broker = self.get_broker() - - if broker is None: - assert self._profile is not None - raise ConfigurationError( - f'profile `{self._profile.name}` does not provide a communicator because it does not define a broker' - ) - - return broker.get_communicator() - def get_coordinator(self) -> 'Coordinator': """Return the coordinator diff --git a/tests/engine/test_futures.py b/tests/engine/test_futures.py index b8ba78aa8..194bcf60c 100644 --- a/tests/engine/test_futures.py +++ b/tests/engine/test_futures.py @@ -31,7 +31,7 @@ def test_calculation_future_broadcasts(self): # No polling future = processes.futures.ProcessFuture( - pk=process.pid, loop=runner.loop, communicator=manager.get_communicator() + pk=process.pid, loop=runner.loop, coordinator=manager.get_coordinator() ) run(process)