Skip to content

Commit

Permalink
Merge pull request #1276 from MetPX/issue1267
Browse files Browse the repository at this point in the history
Issue1267 queueName not working as expected because of upcoming feature
  • Loading branch information
petersilva authored Oct 25, 2024
2 parents 7bea2c0 + b10f050 commit 2c8c5dc
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 23 deletions.
6 changes: 5 additions & 1 deletion docs/source/How2Guides/UPGRADING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ is perhaps complex. This version will read and write both files,
so as to preserve ability to downgrade. later version will drop
support for qname files.


*CHANGE*: in configuration files: *subtopic* must come after
the relevant queue naming options (queueName, queueShare)
in prior releases, the queue naming was a global setting.
In a future version, one will be able to subscribe to multiple
queues with a single subscriber.

3.0.54
------
Expand Down
37 changes: 25 additions & 12 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,9 @@ Instances started on any node with access to the same shared file will use the
same queue. Some may want use the *queueName* option as a more explicit method
of sharing work across multiple nodes.

This *subtopic* option should appear after the queueName setting in files
for the topic bindings to apply to the given queue.


queueShare <str> ( default: ${USER}_${HOSTNAME}_${RAND8} )
----------------------------------------------------------
Expand All @@ -1567,6 +1570,9 @@ will result in a random 8 digit number being appended to the queue name.
All the instances within the configuration with access to the same state directory
will use the queue name thus defined.

This *subtopic* option should appear after the queueShare setting in files
for the topic bindings to apply to the given queue.


randomize <flag>
----------------
Expand Down Expand Up @@ -1973,8 +1979,8 @@ message flows.
subtopic <amqp pattern> (default: #)
------------------------------------

Within an exchange's postings, the subtopic setting narrows the product selection.
To give a correct value to the subtopic,
Within an exchange's postings, the subtopic setting narrows the product selection,
for objects to place in the currently selected queue. To give a correct value to the subtopic,
one has the choice of filtering using **subtopic** with only AMQP's limited wildcarding and
length limited to 255 encoded bytes, or the more powerful regular expression
based **accept/reject** mechanisms described below. The difference being that the
Expand All @@ -1983,18 +1989,18 @@ to the client at all. The **accept/reject** patterns apply to messages sent by
broker to the subscriber. In other words, **accept/reject** are client side filters,
whereas **subtopic** is server side filtering.

It is best practice to use server side filtering to reduce the number of notification messages sent
to the client to a small superset of what is relevant, and perform only a fine-tuning with the
client side mechanisms, saving bandwidth and processing for all.
Use server side filtering to reduce the number of notification messages sent
to the client to a small superset of what is relevant, and refine further with the
client side accept/reject, saving bandwidth and processing for all.

topicPrefix is primarily of interest during protocol version transitions,
where one wishes to specify a non-default protocol version of messages to
subscribe to.
Often, the user specifies one exchange, and several subtopic options.
**Subtopic** is what is normally used to indicate messages of interest
for a given queue. If needed, queueName, and/or queueShare need to be
earlier in the configuration file for the subtopic to apply to the selected queue.

Usually, the user specifies one exchange, and several subtopic options.
**Subtopic** is what is normally used to indicate messages of interest.
To use the subtopic to filter the products, match the subtopic string with
the relative path of the product.
the relative path of the product (non Sarracenia pumps may have different
topic hierarchy conventions.)

For example, consuming from DD, to give a correct value to subtopic, one can
browse the our website **http://dd.weather.gc.ca** and write down all directories
Expand Down Expand Up @@ -2084,7 +2090,6 @@ Sarracenia has a convention for how topics for products should be organized. The
a topicPrefix, followed by subtopics derived from the *relPath* field of the message.
Some networks may choose to use different topic conventions, external to sarracenia.


timeout <interval> (default: 0)
-------------------------------

Expand Down Expand Up @@ -2134,6 +2139,14 @@ prepended to the sub-topic to form a complete topic hierarchy.
This option applies to subscription bindings.
Denotes the version of messages received in the sub-topics. (v03 refers to `<sr3_post.7.html>`_)

topicPrefix is primarily of interest during protocol version transitions,
where one wishes to specify a non-default protocol version of messages to
subscribe to.

For example, Sr3 expects v03 messages by default, but there are
plenty of sources that offer the old version (requiring a topicPrefix of *v02.post*)
to specify the old version of messages.

users <flag> (default: false)
-----------------------------

Expand Down
4 changes: 4 additions & 0 deletions docs/source/fr/CommentFaire/MiseANiveau.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ est peut-être complexe. Cette version lira et écrira les deux fichiers,
afin de préserver la possibilité de rétrogradation. La version ultérieure abandonnera
la prise en charge des fichiers qname.
*CHANGEMENT* : dans les fichiers de configuration : *subtopic* doit venir après
la spécification de nom de file d'attente pertinentes (queueName, queueShare.)
Dans les versions précédentes, la dénomination de la file d'attente était un paramètre global.
Dans une future version, on pourra s'abonner à plusieurs files d'attente avec un seul abonné.

3.0.54
------
Expand Down
26 changes: 21 additions & 5 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,8 @@ Les instances démarrées sur n’importe quel nœud ayant accès au même fichi
même fil d’attente. Certains voudront peut-être utiliser l’option *queueName* comme méthode plus explicite
de partager le travail sur plusieurs nœuds. Il est pourtant recommandé d´utiliser queueShare a cette fin.

l´option *subtopic* devrait apparaître après le paramètre queueName dans les fichiers
pour que les liaisons de sujet s'appliquent à la file d'attente spécifié.


queueShare <str> (default: ${USER}_${HOSTNAME}_${RAND8} )
Expand All @@ -1558,6 +1560,8 @@ Ce entraînera l'ajout d'un nombre aléatoire à 8 chiffres au nom de la file d'
Toutes les instances de la configuration ayant accès au même répertoire d'état
utilisera le nom de file d'attente ainsi défini.

l´option *subtopic* devrait apparaître après le paramètre queueShare dans les fichiers
pour que les liaisons de sujet s'appliquent à la file d'attente spécifié.

randomize <flag>
----------------
Expand Down Expand Up @@ -1965,7 +1969,8 @@ origine. À utiliser uniquement avec des flux de données fiables et organisés
subtopic <modèle amqp> (défaut: #)
-----------------------------------

Dans les publications d’un échange, le paramètre de subtopic restreint la sélection du produit.
Dans les publications d’un échange, le paramètre de subtopic sert à préciser des messages
à placer dans la file d'attente actuellement sélectionnée.
Pour donner la bonne valeur au subtopic, on a le choix de filtrer en utilisant **subtopic** seulement avec le
wildcarding limité d’AMQP et une longueur limitée à 255 octets encodés, ou de manière plus puissante, les expressions régulière
basés sur les mécanismes **accept/reject** décrits ci-dessous. La différence est que le
Expand All @@ -1978,12 +1983,17 @@ Il est recommandé d’utiliser le filtrage côté serveur pour réduire le nomb
au client et envoyer seulement ce qui est pertinent, et seulement régler les mécanismes côté client,
économisant du bandwidth et du traitement pour tous.

topicPrefix est principalement utilisé lors des transitions de version de protocole,
où l’on souhaite spécifier une version de protocole non-commune des messages d'annonce auquel s’abonner.

Normalement, l’utilisateur spécifie un échange et plusieurs options de subtopic. **subtopic** est ce qui est
normalement utilisé pour indiquer les messages d'annonce d'intérêt. Pour utiliser **subtopic** pour filtrer les produits,

Souvent, l'utilisateur spécifie un échange et plusieurs options de sous-thèmes.
Le **subtopic** est ce qui est normalement utilisé pour indiquer les messages d'intérêt
pour une file d'attente donnée. Si nécessaire, **queueName** et/ou **queueShare**
doivent apparaître plus tôt dans le fichier de configuration pour que le sous-thème
s'applique à la file d'attente sélectionnée.

il faut que la chaîne de caractère subtopic corresponde au chemin relatif du produit.
(les pompes non Sarracenia peuvent avoir d´autres conventions de hiérarchie des sujets.)


Par exemple, en consommant à partir de DD, pour donner la bonne valeur au subtopic, il est possible de
parcourir le site Web **http://dd.weather.gc.ca** et noter tous les répertoires
Expand Down Expand Up @@ -2109,6 +2119,12 @@ rajouté au subtopic pour former une hiérarchie complète de thèmes (topics).
Cette option s’applique aux liaisons d’abonnement.
Indique la version des messages d'annonce reçus dans les subtopics. (V03 fait référence à `<sr3_post.7.html>`_)

topicPrefix sert principalement lors des transitions de format de messages.
Le topicPrefix identifie dans quel version de format les messages sous le thème
sont créés. Sr3 s´attend a des messages v03 par défault, mais il y plein
de sources qui offrent l´ancienne version (nécessitant une topicPrefix de *v02.post*)
pour spécifier l´ancienned version de messages.

topicCopy (défaut: False)
-------------------------

Expand Down
9 changes: 9 additions & 0 deletions sarracenia/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,8 @@ def parse_file(self, cfg, component=None):
saved_lineno=0
self.files.append(cfgfilepath)

self.subtopic_seen=False

for l in open(cfgfilepath, "r").readlines():
lineno+=1
if self.lineno > 0:
Expand Down Expand Up @@ -1646,6 +1648,7 @@ def parse_line(self, component, cfg, cfname, lineno, l ):
logger.error( f"{','.join(self.files)}:{self.lineno} file {v} failed to parse: {ex}" )
logger.debug('Exception details: ', exc_info=True)
elif k in ['subtopic']:
self.subtopic_seen=True
self._parse_binding(v)
elif k in ['topicPrefix']:
if '/' in v :
Expand Down Expand Up @@ -1753,6 +1756,12 @@ def parse_line(self, component, cfg, cfname, lineno, l ):
logger.error( f'{",".join(self.files)}:{lineno} invalid entry {i} in {k}. Must be one of: {set_choices[k]}' )

elif k in str_options:
# queueName warning... is for something that is not an error...
# probably need to remove this warning later... because people could use default queue with subtopic and
# specify a second queue with different bindings... so this warning could be complaining about something
# that is correct. but in every current case, the warning will be helpful.
if ( k == 'queueName' ) and self.subtopic_seen:
logger.warning( f"queueName usually should be before subtopic in configs: subtopic to default queue" )
if ( k == 'directory' ) and not self.download:
logger.info( f"{','.join(self.files)}:{lineno} if download is false, directory has no effect" )

Expand Down
8 changes: 4 additions & 4 deletions sarracenia/config/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ def add(self, new_subscription):
for s in self:
if ( s['broker'] == new_subscription['broker'] ) and \
( s['queue']['name'] == new_subscription['queue']['name'] ):
newb = new_subscription['bindings'][0]
for b in s['bindings']:
newb = new_subscription['bindings'][0]
if (b['sub'] != newb['sub']) or (b['prefix'] != newb['prefix']):
s['bindings'].append( { 'exchange': newb['exchange'], \
'prefix':newb['prefix'], 'sub':newb['sub'] } )
if newb == b:
found=True
if not found:
s['bindings'].append( newb )

if not found:
self.append(new_subscription)
Expand Down
26 changes: 25 additions & 1 deletion sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2775,6 +2775,24 @@ def convert1(self,cfg):
v3_cfg.write('#v2 sftp handling is always absolute, sr3 is relative. might need this, remove when all sr3:\n')
v3_cfg.write('#flowcb accept.sftp_absolute\n')

queueName=None

#1st prep pass (for cases when re-ordering needed.)
with open(v2_config_path, 'r') as v2_cfg:
for line in v2_cfg.readlines():
if len(line.strip()) < 1:
continue
if line[0].startswith('#'):
continue
line = line.strip().split()
k = line[0]
if k in synonyms:
k = synonyms[k]
if k in [ 'queueName' ]:
queueName=line[1]

#2nd re-write pass.
subtopicFound=False
with open(v2_config_path, 'r') as v2_cfg:
for line in v2_cfg.readlines():
if len(line.strip()) < 1:
Expand Down Expand Up @@ -2818,7 +2836,13 @@ def convert1(self,cfg):
else:
logger.error( f"unknown checksum spec: {line}")
continue

elif k == 'queueName':
if subtopicFound or not queueName:
continue
elif k == 'subtopic':
if queueName:
v3_cfg.write(f'queueName {queueName}\n')
queueName=None
if (k == 'accept') :
if line[1] == '.*':
accept_all_seen=True
Expand Down

0 comments on commit 2c8c5dc

Please sign in to comment.