Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue1346 deal with change in semantics for queue declares from previous versions. #1349

Merged
merged 5 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,9 @@ The broker option tell each component which broker to contact.
Once connected to an AMQP broker, the user needs to bind a queue
to exchanges and topics to determine the notification messages of interest.

This *subtopic* option should appear after the *broker* setting in files
for the setting to apply to a given queue.


bufSize <size> (default: 1MB)
-----------------------------
Expand Down Expand Up @@ -742,6 +745,9 @@ The queue will be written to and recovered from disk if the broker is restarted.
Note: only *persistent* messages will remain in a durable queue after a broker restart.
Persistent messages can be published by enabling the **persistent** option (it is enabled by default).

This *subtopic* option should appear after the *durable* setting in files
for the setting to apply to a given queue.

fileEvents <event,event,...>
----------------------------

Expand Down Expand Up @@ -781,6 +787,9 @@ are uptodate. If the exchange already exists, this flag can be set to False,
so no attempt to exchange the queue is made, or it´s bindings.
These options are useful on brokers that do not permit users to declare their exchanges.

This *subtopic* option should appear after the *exchangeDeclare* setting in files
for the topic bindings to apply to the given queue.



expire <duration> (default: 5m == five minutes. RECOMMEND OVERRIDING)
Expand All @@ -803,6 +812,9 @@ The default is set low because it defines how long resources on the broker will
and in early use (when default was 1 week) brokers would often get overloaded with very
long queues for left-over experiments.

This *subtopic* option should appear after the *expire* setting in files
for the topic bindings to apply to the given queue.


filename <keyword> (default:None)
-----------------------------------
Expand Down Expand Up @@ -1531,6 +1543,9 @@ optimal load sharing, the prefetch should be set as low as possible. However, o
haul links, it is necessary to raise this number, to hide round-trip latency, so a setting
of 10 or more may be needed.

This *subtopic* option should appear after the *prefetch* setting in files
for the setting to apply to a given queue.

queueBind
---------

Expand All @@ -1540,6 +1555,9 @@ are uptodate. If the queue already exists, These flags can be
set to False, so no attempt to declare the queue is made, or it´s bindings.
These options are useful on brokers that do not permit users to declare their queues.

This *subtopic* option should appear after the *queueBind* setting in files
for the setting to apply to a given queue.

queueDeclare
------------

Expand All @@ -1549,6 +1567,9 @@ are uptodate. If the queue already exists, These flags can be
set to False, so no attempt to declare the queue is made, or it´s bindings.
These options are useful on brokers that do not permit users to declare their queues.

This *subtopic* option should appear after the *queueDeclare* setting in files
for the setting to apply to a given queue.

queueName|queue|queue_name|qn
-----------------------------

Expand Down
25 changes: 21 additions & 4 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ dans Sarracenia a priorité par rapport à une variable du même nom dans l’en
Notez que les paramètres de *flatten* peuvent être modifiés entre les options de *directory*.


Substitutions Compatible Sundew
-------------------------------
Substitutions compatibles avec Sundew
-------------------------------------

Dans `MetPX Sundew <../Explication/Glossary.html#sundew>`_, le format de la nomination de fichier est beaucoup plus
stricte, et est spécialisée pour une utilisation aves les données du World Meteorological Organization (WMO).
Expand Down Expand Up @@ -518,7 +518,7 @@ optimisé en n’envoyant que les pièces qui ont changé.
L’option *outlet* permet à la sortie finale d’être autre qu’un poste.
Voir `sr3_cpump(1) <sr3_cpump.1.html>`_ pour plus de détails.

Broker
broker
------

**broker [amqp|mqtt]{s}://<utilisateur>:<mot-de-passe>@<hoteDuCourtier>[:port]/<vhost>**
Expand All @@ -545,6 +545,10 @@ L’option broker indique à chaque composant quel courtier contacter.
Une fois connecté à un courtier AMQP, l’utilisateur doit lier une fil d’attente
aux échanges et aux thèmes pour déterminer le messages d'annonce en question.

l´option *subtopic* devrait apparaître après le paramètre *broker* dans les fichiers
pour que les liaisons de sujet s'appliquent à la file d'attente spécifié.


bufSize <size> (défaut: 1m)
---------------------------

Expand Down Expand Up @@ -735,6 +739,8 @@ Cela signifie que la fil d’attente est sur le disque si le courtier est redém
Remarque: seuls les messages *persistants* resteront dans une file d'attente durable après le redémarrage du courtier.
Les messages persistants peuvent être publiés en activant l'option **persistant** (elle est activée par défaut).

l´option *subtopic* devrait apparaître après le paramètre *durable* dans les fichiers
pour que les liaisons de sujet s'appliquent à la file d'attente spécifié.

fileEvents <évènement, évènement,...>
-------------------------------------
Expand Down Expand Up @@ -794,6 +800,9 @@ Le défaut est défini par une valeur basse car il définit combien de temps les
assigné au courtier, et dans les premières utilisations (lorsque le défaut était de de 1 semaine), les courtiers
étaient souvent surchargés de très longues files d’attente pour les tests restants.

l´option *subtopic* devrait apparaître après le paramètre *expire* dans les fichiers
pour que les liaisons de sujet s'appliquent à la file d'attente spécifié.


filename <mots-clé> (défaut:None)
-----------------------------------
Expand Down Expand Up @@ -1529,13 +1538,21 @@ Si la fil d’attente existe déjà, ces indicateurs peuvent être défini a Fal
ne soit effectuée pour fil d’attente ou pour ses liaisons. Ces options sont utiles sur les courtiers qui ne
permettent pas aux utilisateurs de déclarer leurs files d’attente.

l´option *subtopic* devrait apparaître après le paramètre *queueBind* dans les fichiers
pour que les liaisons de sujet s'appliquent à la file d'attente spécifié.


queueDeclare <flag> (défaut: True)
----------------------------------

Avec l´option queueDeclare à *True*, un composant déclare un fil d´attente pour accumuler des messages d'annonce lors
de chaque démarrage. Des fois les permissions sont restrictifs sur les courtiers, alors on ne peut pas
faire de tels déclarations de ressources. Dans ce cas, il faut supprimer cette déclaration.

l´option *subtopic* devrait apparaître après le paramètre *queueDeclare* dans les fichiers
pour que les liaisons de sujet s'appliquent à la file d'attente spécifié.


queueName|queue|queue_name|qn
-----------------------------

Expand Down Expand Up @@ -1569,7 +1586,7 @@ Les instances démarrées sur n’importe quel nœud ayant accès au même fichi
même fil d’attente. Certains voudront peut-être utiliser l’option *queueName* comme méthode plus explicite
de partager le travail sur plusieurs nœuds. Il est pourtant recommandé d´utiliser queueShare a cette fin.

l´option *subtopic* devrait apparaître après le paramètre queueName dans les fichiers
l´option *subtopic* devrait apparaître après le paramètre *queueName* dans les fichiers
pour que les liaisons de sujet s'appliquent à la file d'attente spécifié.


Expand Down
9 changes: 9 additions & 0 deletions sarracenia/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ def __repr__(self) -> str:

perm_options = [ 'permDefault', 'permDirDefault','permLog']

# options that apply to queues, and so must appear before subtopic resolves queues characteristics.
#
queue_options = [ 'auto_delete', 'broker', 'durable', 'exchange', 'exchangeSuffix', 'expire', 'prefetch', \
'qos', 'queueBind', 'queueDeclare' ]

size_options = ['accelThreshold', 'blockSize', 'bufSize', 'byteRateMax', 'fileSizeMax', 'inlineByteMax']

str_options = [
Expand Down Expand Up @@ -1626,6 +1631,10 @@ def parse_line(self, component, cfg, cfname, lineno, l ):
self.logEvents = self.logEvents | set(['nodupe'])
return

if k in queue_options and self.subtopic_seen:
logger.warning( f"{','.join(self.files)}:{lineno} {k} needs to appear before *subtopic*" \
" unless you need different queues to have different settings")

if len(line) < 2:
logger.error( f"{','.join(self.files)}:{lineno} {k} missing argument(s)" )
return
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/config/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, options, queueName, subtopic):
self['bindings'] = [ { 'exchange': options.exchange, 'prefix': options.topicPrefix, 'sub': subtopic } ]

self['queue']={ 'name': queueName, 'cleanup_needed': None }
for a in [ 'auto_delete', 'durable', 'expire', 'message_ttl', 'prefetch', 'qos', 'queueBind', 'queueDeclare' ]:
for a in [ 'auto_delete', 'durable', 'expire', 'prefetch', 'qos', 'queueBind', 'queueDeclare' ]:
aa = a.replace('queue','').lower()
if hasattr(options, a) and getattr(options,a):
self['queue'][aa] = getattr(options,a)
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def _queueDeclare(self,passive=False) -> int:
x = int(self.o['messageAgeMax'] * 1000)
if x > 0: args['x-message-ttl'] = x

#FIXME: conver expire, message_ttl to proper units.
#FIXME: convert expire, message_ttl to proper units.
if self.o['dry_run']:
logger.info('queue declare (dry run) %s (as: %s) ' %
(self.o['queueName'], broker_str))
Expand Down
26 changes: 20 additions & 6 deletions sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2803,19 +2803,24 @@ def convert1(self,cfg):
v3_cfg.write('#topicCopy on\n')

if component in [ 'sarra', 'sender', 'subscribe' ]:
v3_cfg.write('#v2 sftp handling is always absolute, sr3 is relative. might need this, remove when all sr3:\n')
v3_cfg.write(' \n#\n#v2 sftp handling is always absolute, sr3 is relative. might need this, remove when all sr3:\n')
v3_cfg.write('#flowcb accept.sftp_absolute\n')

queueName=None

#1st prep pass (for cases when re-ordering needed.)
already_moved=False
verbs_to_move = [ 'auto_delete', 'durable', 'expire', 'message_ttl', 'prefetch', \
'qos', 'queueBind', 'exchangeDeclare' ]


with open(v2_config_path, 'r') as v2_cfg:
for line in v2_cfg.readlines():
if len(line.strip()) < 1:
for input_line in v2_cfg.readlines():
if len(input_line.strip()) < 1:
continue
if line[0].startswith('#'):
if input_line[0].startswith('#'):
continue
line = line.strip().split()
line = input_line.strip().split()
k = line[0]
if k in synonyms:
k = synonyms[k]
Expand All @@ -2825,9 +2830,15 @@ def convert1(self,cfg):
inflight_seen=True
if k in [ 'queueName' ]:
queueName=line[1]
if k in verbs_to_move:
if not already_moved:
v3_cfg.write(f" \n#\n#Move formerly global options ({','.join(verbs_to_move)} to start of config file.\n")
alread_moved=True
v3_cfg.write(input_line+"\n")


if not inflight_seen and post_broker_seen and component in [ 'sarra', 'sender', 'subscribe' ]:
v3_cfg.write('#sr3 inflight defaults to None, v2 defaulted to .tmp when post_broker set.\n')
v3_cfg.write(' \n#\n#sr3 inflight defaults to None, v2 defaulted to .tmp when post_broker set.\n')
v3_cfg.write('inflight .tmp\n')

#2nd re-write pass.
Expand All @@ -2842,6 +2853,9 @@ def convert1(self,cfg):
continue
line = line.strip().split()
k = line[0]

if k in verbs_to_move: # moved to start of file.
continue
if k in synonyms:
k = synonyms[k]

Expand Down
Loading