Skip to content

Commit

Permalink
messaging: Delay applying exchange/queue name prefix config
Browse files Browse the repository at this point in the history
Exchanges are defined as module level vars initialized on import.
But, oslo_config is not setup yet at import time, so delay applying the
new prefix setting until just before they get created in RabbitMQ.

Luckily, Exchange() objects are lightweight objects that just
hold the exchange name and type. They could do more, but we
don't bind them to a channel. Because the exchange objects
merely hold strings, and because they are basically singletons
(module-level vars), we can safely update the exchange name
just before declaring it in RabbitMQ. From that point on,
everything that uses a singleton exchange object will get
the updated name.
  • Loading branch information
cognifloyd committed Nov 9, 2024
1 parent f103ad5 commit ca88923
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 53 deletions.
1 change: 0 additions & 1 deletion st2common/st2common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ def register_opts(ignore_errors=False):
"prefix",
default="st2",
help="Prefix for all exchange and queue names.",
advanced=True,
),
]

Expand Down
3 changes: 1 addition & 2 deletions st2common/st2common/services/sensor_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import six
from kombu.mixins import ConsumerMixin
from oslo_config import cfg

from st2common import log as logging
from st2common.transport import reactor, publishers
Expand Down Expand Up @@ -127,7 +126,7 @@ def stop(self):
@staticmethod
def _get_queue(queue_suffix):
queue_name = queue_utils.get_queue_name(
queue_name_base=f"{cfg.CONF.messaging.prefix}.sensor.watch",
queue_name_base="st2.sensor.watch",
queue_name_suffix=queue_suffix,
add_random_uuid_to_suffix=True,
)
Expand Down
3 changes: 1 addition & 2 deletions st2common/st2common/services/triggerwatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import six
from kombu.mixins import ConsumerMixin
from oslo_config import cfg

from st2common import log as logging
from st2common.persistence.trigger import Trigger
Expand Down Expand Up @@ -164,7 +163,7 @@ def _load_triggers_from_db(self):
@staticmethod
def _get_queue(queue_suffix, exclusive):
queue_name = queue_utils.get_queue_name(
queue_name_base=f"{cfg.CONF.messaging.prefix}.trigger.watch",
queue_name_base="st2.trigger.watch",
queue_name_suffix=queue_suffix,
add_random_uuid_to_suffix=True,
)
Expand Down
3 changes: 1 addition & 2 deletions st2common/st2common/transport/actionalias.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from __future__ import absolute_import
from kombu import Exchange, Queue
from oslo_config import cfg

from st2common.transport import publishers

Expand All @@ -25,7 +24,7 @@
"get_queue",
]

ACTIONALIAS_XCHG = Exchange(f"{cfg.CONF.messaging.prefix}.actionalias", type="topic")
ACTIONALIAS_XCHG = Exchange("st2.actionalias", type="topic")


class ActionAliasPublisher(publishers.CUDPublisher):
Expand Down
5 changes: 1 addition & 4 deletions st2common/st2common/transport/actionexecutionstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
from __future__ import absolute_import

from kombu import Exchange, Queue
from oslo_config import cfg

from st2common.transport import publishers

__all__ = ["ActionExecutionStatePublisher"]

ACTIONEXECUTIONSTATE_XCHG = Exchange(
f"{cfg.CONF.messaging.prefix}.actionexecutionstate", type="topic"
)
ACTIONEXECUTIONSTATE_XCHG = Exchange("st2.actionexecutionstate", type="topic")


class ActionExecutionStatePublisher(publishers.CUDPublisher):
Expand Down
3 changes: 1 addition & 2 deletions st2common/st2common/transport/announcement.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from __future__ import absolute_import

from kombu import Exchange, Queue
from oslo_config import cfg

from st2common import log as logging
from st2common.constants.trace import TRACE_CONTEXT
Expand All @@ -28,7 +27,7 @@
LOG = logging.getLogger(__name__)

# Exchange for Announcements
ANNOUNCEMENT_XCHG = Exchange(f"{cfg.CONF.messaging.prefix}.announcement", type="topic")
ANNOUNCEMENT_XCHG = Exchange("st2.announcement", type="topic")


class AnnouncementPublisher(object):
Expand Down
17 changes: 10 additions & 7 deletions st2common/st2common/transport/bootstrap_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG
from st2common.transport.announcement import ANNOUNCEMENT_XCHG
from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper
from st2common.transport.execution import EXECUTION_XCHG
from st2common.transport.execution import EXECUTION_XCHG, EXECUTION_OUTPUT_XCHG
from st2common.transport.liveaction import LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG
from st2common.transport.reactor import SENSOR_CUD_XCHG
from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG
Expand Down Expand Up @@ -67,6 +67,7 @@
ACTIONEXECUTIONSTATE_XCHG,
ANNOUNCEMENT_XCHG,
EXECUTION_XCHG,
EXECUTION_OUTPUT_XCHG,
LIVEACTION_XCHG,
LIVEACTION_STATUS_MGMT_XCHG,
TRIGGER_CUD_XCHG,
Expand Down Expand Up @@ -94,16 +95,15 @@
WORKFLOW_EXECUTION_RESUME_QUEUE,
# Those queues are dynamically / late created on some class init but we still need to
# pre-declare them for redis Kombu backend to work.
reactor.get_trigger_cud_queue(
name=f"{cfg.CONF.messaging.prefix}.preinit", routing_key="init"
),
reactor.get_sensor_cud_queue(
name=f"{cfg.CONF.messaging.prefix}.preinit", routing_key="init"
),
reactor.get_trigger_cud_queue(name="st2.preinit", routing_key="init"),
reactor.get_sensor_cud_queue(name="st2.preinit", routing_key="init"),
]


def _do_register_exchange(exchange, connection, channel, retry_wrapper):
prefix = cfg.CONF.messaging.prefix
if exchange.name and prefix != "st2":
exchange.name = exchange.name.replace("st2.", f"{prefix}.", 1)
try:
kwargs = {
"exchange": exchange.name,
Expand All @@ -127,6 +127,9 @@ def _do_register_exchange(exchange, connection, channel, retry_wrapper):


def _do_predeclare_queue(channel, queue):
prefix = cfg.CONF.messaging.prefix
if queue.name and prefix != "st2":
queue.name = queue.name.replace("st2.", f"{prefix}.", 1)
LOG.debug('Predeclaring queue for exchange "%s"' % (queue.exchange.name))

bound_queue = None
Expand Down
7 changes: 2 additions & 5 deletions st2common/st2common/transport/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from __future__ import absolute_import
from kombu import Exchange, Queue
from oslo_config import cfg

from st2common.transport import publishers

Expand All @@ -28,10 +27,8 @@
"get_output_queue",
]

EXECUTION_XCHG = Exchange(f"{cfg.CONF.messaging.prefix}.execution", type="topic")
EXECUTION_OUTPUT_XCHG = Exchange(
f"{cfg.CONF.messaging.prefix}.execution.output", type="topic"
)
EXECUTION_XCHG = Exchange("st2.execution", type="topic")
EXECUTION_OUTPUT_XCHG = Exchange("st2.execution.output", type="topic")


class ActionExecutionPublisher(publishers.CUDPublisher):
Expand Down
7 changes: 2 additions & 5 deletions st2common/st2common/transport/liveaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
from __future__ import absolute_import

from kombu import Exchange, Queue
from oslo_config import cfg

from st2common.transport import publishers

__all__ = ["LiveActionPublisher", "get_queue", "get_status_management_queue"]


LIVEACTION_XCHG = Exchange(f"{cfg.CONF.messaging.prefix}.liveaction", type="topic")
LIVEACTION_STATUS_MGMT_XCHG = Exchange(
f"{cfg.CONF.messaging.prefix}.liveaction.status", type="topic"
)
LIVEACTION_XCHG = Exchange("st2.liveaction", type="topic")
LIVEACTION_STATUS_MGMT_XCHG = Exchange("st2.liveaction.status", type="topic")


class LiveActionPublisher(publishers.CUDPublisher, publishers.StatePublisherMixin):
Expand Down
23 changes: 11 additions & 12 deletions st2common/st2common/transport/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from __future__ import absolute_import

from kombu import Queue
from oslo_config import cfg

from st2common.constants import action as action_constants
from st2common.transport import actionalias
Expand Down Expand Up @@ -56,49 +55,49 @@

# Used by the action scheduler service
ACTIONSCHEDULER_REQUEST_QUEUE = liveaction.get_status_management_queue(
f"{cfg.CONF.messaging.prefix}.actionrunner.req",
"st2.actionrunner.req",
routing_key=action_constants.LIVEACTION_STATUS_REQUESTED,
)


# Used by the action runner service
ACTIONRUNNER_WORK_QUEUE = liveaction.get_status_management_queue(
f"{cfg.CONF.messaging.prefix}.actionrunner.work",
"st2.actionrunner.work",
routing_key=action_constants.LIVEACTION_STATUS_SCHEDULED,
)

ACTIONRUNNER_CANCEL_QUEUE = liveaction.get_status_management_queue(
f"{cfg.CONF.messaging.prefix}.actionrunner.cancel",
"st2.actionrunner.cancel",
routing_key=action_constants.LIVEACTION_STATUS_CANCELING,
)

ACTIONRUNNER_PAUSE_QUEUE = liveaction.get_status_management_queue(
f"{cfg.CONF.messaging.prefix}.actionrunner.pause",
"st2.actionrunner.pause",
routing_key=action_constants.LIVEACTION_STATUS_PAUSING,
)

ACTIONRUNNER_RESUME_QUEUE = liveaction.get_status_management_queue(
f"{cfg.CONF.messaging.prefix}.actionrunner.resume",
"st2.actionrunner.resume",
routing_key=action_constants.LIVEACTION_STATUS_RESUMING,
)


# Used by the notifier service
NOTIFIER_ACTIONUPDATE_WORK_QUEUE = execution.get_queue(
f"{cfg.CONF.messaging.prefix}.notifiers.execution.work",
"st2.notifiers.execution.work",
routing_key=publishers.UPDATE_RK,
)


# Used by the results tracker service
RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE = actionexecutionstate.get_queue(
f"{cfg.CONF.messaging.prefix}.resultstracker.work", routing_key=publishers.CREATE_RK
"st2.resultstracker.work", routing_key=publishers.CREATE_RK
)


# Used by the rules engine service
RULESENGINE_WORK_QUEUE = reactor.get_trigger_instances_queue(
name=f"{cfg.CONF.messaging.prefix}.trigger_instances_dispatch.rules_engine",
name="st2.trigger_instances_dispatch.rules_engine",
routing_key="#",
)

Expand Down Expand Up @@ -137,16 +136,16 @@

# Used by the workflow engine service
WORKFLOW_EXECUTION_WORK_QUEUE = workflow.get_status_management_queue(
name=f"{cfg.CONF.messaging.prefix}.workflow.work",
name="st2.workflow.work",
routing_key=action_constants.LIVEACTION_STATUS_REQUESTED,
)

WORKFLOW_EXECUTION_RESUME_QUEUE = workflow.get_status_management_queue(
name=f"{cfg.CONF.messaging.prefix}.workflow.resume",
name="st2.workflow.resume",
routing_key=action_constants.LIVEACTION_STATUS_RESUMING,
)

WORKFLOW_ACTION_EXECUTION_UPDATE_QUEUE = execution.get_queue(
f"{cfg.CONF.messaging.prefix}.workflow.action.update",
"st2.workflow.action.update",
routing_key=publishers.UPDATE_RK,
)
9 changes: 3 additions & 6 deletions st2common/st2common/transport/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from __future__ import absolute_import
from kombu import Exchange, Queue
from oslo_config import cfg

from st2common import log as logging
from st2common.constants.trace import TRACE_CONTEXT
Expand All @@ -34,15 +33,13 @@
LOG = logging.getLogger(__name__)

# Exchange for Trigger CUD events
TRIGGER_CUD_XCHG = Exchange(f"{cfg.CONF.messaging.prefix}.trigger", type="topic")
TRIGGER_CUD_XCHG = Exchange("st2.trigger", type="topic")

# Exchange for TriggerInstance events
TRIGGER_INSTANCE_XCHG = Exchange(
f"{cfg.CONF.messaging.prefix}.trigger_instances_dispatch", type="topic"
)
TRIGGER_INSTANCE_XCHG = Exchange("st2.trigger_instances_dispatch", type="topic")

# Exchane for Sensor CUD events
SENSOR_CUD_XCHG = Exchange(f"{cfg.CONF.messaging.prefix}.sensor", type="topic")
SENSOR_CUD_XCHG = Exchange("st2.sensor", type="topic")


class SensorCUDPublisher(publishers.CUDPublisher):
Expand Down
7 changes: 2 additions & 5 deletions st2common/st2common/transport/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
from __future__ import absolute_import

import kombu
from oslo_config import cfg

from st2common.transport import publishers

__all__ = ["WorkflowExecutionPublisher", "get_queue", "get_status_management_queue"]

WORKFLOW_EXECUTION_XCHG = kombu.Exchange(
f"{cfg.CONF.messaging.prefix}.workflow", type="topic"
)
WORKFLOW_EXECUTION_XCHG = kombu.Exchange("st2.workflow", type="topic")
WORKFLOW_EXECUTION_STATUS_MGMT_XCHG = kombu.Exchange(
f"{cfg.CONF.messaging.prefix}.workflow.status", type="topic"
"st2.workflow.status", type="topic"
)


Expand Down

0 comments on commit ca88923

Please sign in to comment.