Skip to content

Commit

Permalink
Issue1299 - New after_gather entry point + better error handling in…
Browse files Browse the repository at this point in the history
… sundew_dirPattern (#1308)

* Add after_gather entry point + add it to renamer

* Have UpdateFieldsAccepted return a bool and wrap sundew_dirPattern in try/except

* Add documentation for new after_gather entry point

* Correct unit tests.
  • Loading branch information
andreleblanc11 authored Dec 1, 2024
1 parent 7c5f470 commit a4af0e2
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 51 deletions.
11 changes: 11 additions & 0 deletions docs/source/Explanation/SarraPluginDev.rst
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,17 @@ for detailed information about call signatures and return values, etc...
| | |
| | |
+---------------------+----------------------------------------------------+
| | |
| after_gather | Called after gather and before filter. |
| (self,worklist) | |
| | Not used often. after_accept should be used |
| | for most use cases. |
| | |
| | after_gather should only really be used when: |
| | - There needs to be a change to the worklist |
| | of messages before attempting to filter. |
| | |
+---------------------+----------------------------------------------------+
| | called after When a transfer has been attempted. |
| after_work | |
| (self,worklist) | All messages are acknowledged by this point. |
Expand Down
3 changes: 3 additions & 0 deletions docs/source/How2Guides/FlowCallbacks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ the `sarracenia.flowcb <../../sarracenia/flowcb/__init__.py>`_ class.
Briefly, the algorithm has the following steps:

* **gather** -- passively collect notification messages to be processed.

* *after_gather* callback entry point

* **poll** -- actively collect notification messages to be processed.
* **filter** -- apply accept/reject regular expression matches to the notification message list.

Expand Down
3 changes: 3 additions & 0 deletions docs/source/fr/CommentFaire/FlowCallbacks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ la classe `sarracenia.flowcb <../../sarracenia/flowcb/__init__.py>`_.
En bref, l’algorithme comporte les étapes suivantes :

* **gather** -- collecter passivement les messages de notification à traiter.

* *after_gather* point d’entré de callback

* **poll** -- collecter activement les messages de notification à traiter.
* **filter** -- appliquer des correspondances d’expression régulière accept/reject à la liste des messages de notification.

Expand Down
13 changes: 13 additions & 0 deletions docs/source/fr/Explication/SarraPluginDev.rst
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,19 @@ pour des informations détaillées sur les signatures d’appel et les valeurs d
| | binaires pour les fichiers volumineux.) |
| | |
+---------------------+----------------------------------------------------+
| | |
| after_gather | Appelé après gather et avant filter (filtre) |
| (self,worklist) | |
| | C'est une option peu utilisée. |
| | after_accept devrait être utilisé pour la |
| | plupart des cas |
| | |
| | after_gather devrait seulement être utilisé |
| | lorsque: |
| | - Un changement doit être fait à la worklist |
| | de messages avant d'atteindre le filtre. |
| | |
+---------------------+----------------------------------------------------+
| | appelé après qu’un transfert a été tenté. |
| after_work | |
| (self,worklist) | A ce point, tous les messages sont reconnus. |
Expand Down
49 changes: 37 additions & 12 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ def sundew_getDestInfos(self, msg, currentFileOption, filename):
"""
def updateFieldsAccepted(self, msg, urlstr, pattern, maskDir,
maskFileOption, mirror, path_strip_count, pstrip, flatten) -> None:
maskFileOption, mirror, path_strip_count, pstrip, flatten) -> bool:
"""
Set new message fields according to values when the message is accepted.
Expand All @@ -818,6 +818,8 @@ def updateFieldsAccepted(self, msg, urlstr, pattern, maskDir,
* pstrip: pattern strip regexp to apply instead of a count.
* flatten: a character to replace path separators with toe change a multi-directory
deep file name into a single long file name
return True on success
"""

Expand Down Expand Up @@ -961,14 +963,22 @@ def updateFieldsAccepted(self, msg, urlstr, pattern, maskDir,

tfname = filename
# when sr_sender did not derived from sr_subscribe it was always called
new_dir = self.o.sundew_dirPattern(pattern, urlstr, tfname, new_dir)
msg.updatePaths(self.o, new_dir, filename)
try:
new_dir = self.o.sundew_dirPattern(pattern, urlstr, tfname, new_dir)
msg.updatePaths(self.o, new_dir, filename)
except Exception as ex:
logger.error( f"sundew_dirPattern crashed: {ex}." )
logger.debug( "details:", exc_info=True )
return False

if maskFileOption:
msg['new_file'] = self.sundew_getDestInfos(msg, maskFileOption, filename)
msg['new_relPath'] = '/'.join( msg['new_relPath'].split('/')[0:-1] + [ msg['new_file'] ] )


return True


def filter(self) -> None:

logger.debug(
Expand Down Expand Up @@ -1068,14 +1078,18 @@ def filter(self) -> None:
(str(mask), strip, urlToMatch))
break


m['_mask'] = mask
m['_deleteOnPost'].add('_mask')

self.updateFieldsAccepted(m, url, pattern, maskDir,
if self.updateFieldsAccepted(m, url, pattern, maskDir,
maskFileOption, mirror, strip,
pstrip, flatten)
pstrip, flatten):
filtered_worklist.append(m)
else:
self.reject(m, 404, "unable to update fields %s" % url)


filtered_worklist.append(m)
break

if not matched:
Expand All @@ -1084,23 +1098,32 @@ def filter(self) -> None:
m['renameUnlink'] = True
m['_deleteOnPost'] |= set(['renameUnlink'])
logger.debug("rename deletion 2 %s" % (m['fileOp']['rename']))
filtered_worklist.append(m)
self.updateFieldsAccepted(m, url, None,

if self.updateFieldsAccepted(m, url, None,
default_accept_directory,
self.o.filename, self.o.mirror,
self.o.strip, self.o.pstrip,
self.o.flatten)
self.o.flatten):
filtered_worklist.append(m)
else:
self.reject(m, 404, "unable to update fields %s" % url)


continue

if self.o.acceptUnmatched:
logger.debug("accept: unmatched pattern=%s" % (url))
# FIXME... missing dir mapping with mirror, strip, etc...
self.updateFieldsAccepted(m, url, None,
if self.updateFieldsAccepted(m, url, None,
default_accept_directory,
self.o.filename, self.o.mirror,
self.o.strip, self.o.pstrip,
self.o.flatten)
filtered_worklist.append(m)
self.o.flatten):

filtered_worklist.append(m)
else:
self.reject(m, 404, "unable to update fields %s" % url)

else:
self.reject(m, 404, "unmatched pattern %s" % url)

Expand Down Expand Up @@ -1151,6 +1174,8 @@ def gather(self) -> None:

return

self._runCallbacksWorklist('after_gather')

# gather is an extended version of poll.
if self.o.component != 'poll':
return
Expand Down
13 changes: 12 additions & 1 deletion sarracenia/flowcb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

entry_points = [

'ack', 'after_accept', 'after_post', 'after_work', 'destfn', 'do_poll',
'ack', 'after_accept', 'after_gather', 'after_post', 'after_work', 'destfn', 'do_poll',
'download', 'gather', 'metricsReport', 'on_cleanup', 'on_declare', 'on_features',
'on_housekeeping', 'on_sanity', 'on_start', 'on_stop',
'please_stop', 'poll', 'post', 'report', 'send',
Expand Down Expand Up @@ -105,6 +105,17 @@ def after_accept(self,worklist) -> None::
and move messages to worklist.rejected to prevent further processing.
do not delete any messages, only move between worklists.
def after_gather(self,worklist) -> None::
Task: operate on worklist.incoming to help decide which messages to process further.
Move messages to worklist.rejected to prevent further processing.
Should only really be used for special use cases when message processing
needs to be done before going through `filter` of the flow algorithm.
Otherwise, after_accept entry point should be used.
def after_work(self,worklist) -> None::
Task: operate on worklist.ok (files which have arrived.)
Expand Down
8 changes: 8 additions & 0 deletions sarracenia/flowcb/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ def after_accept(self, worklist):
if set(['after_accept']) & self.o.logEvents:
logger.info( f"accepted: (lag: {lag:.2f} ) {self._messageAcceptStr(msg)}" )

def after_gather(self, worklist):
if set(['after_gather']) & self.o.logEvents:
for msg in worklist.incoming:
logger.info("gathered: %s" % self._messagePostStr(msg))
for msg in worklist.rejected:
logger.info("rejected: %s" % self._messagePostStr(msg))


def after_post(self, worklist):
if set(['after_post']) & self.o.logEvents:
for msg in worklist.ok:
Expand Down
20 changes: 7 additions & 13 deletions sarracenia/flowcb/rename/raw2bulletin.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self,options) :
self.o.add_option('binaryInitialCharacters', 'list', [b'BUFR' , b'GRIB', b'\211PNG'])

# If file was converted, get rid of extensions it had
def after_accept(self,worklist):
def after_gather(self,worklist):

new_worklist = []

Expand Down Expand Up @@ -152,15 +152,8 @@ def after_accept(self,worklist):

# Add current time as new timestamp to filename
new_file = header + "_" + timehandler.strftime('%d%H%M') + "_" + BBB + "_" + stn_id + "_" + seq + "_PROBLEM"

# Write the file manually as the messages don't get posted downstream.
# The message won't also get downloaded further downstream
msg['new_file'] = new_file
new_path = msg['new_dir'] + '/' + msg['new_file']

# with open(new_path, 'w') as f: f.write(data)

logger.error(f"New filename (for problem file): {new_file}")

elif stn_id == None:
new_file = header + "_" + BBB + "_" + '' + "_" + seq + "_PROBLEM"
logger.error(f"New filename (for problem file): {new_file}")
Expand All @@ -169,15 +162,16 @@ def after_accept(self,worklist):
else:
new_file = header + "_" + ddhhmm + "_" + BBB + "_" + stn_id + "_" + seq

msg['new_file'] = new_file

# No longer needed
if 'isProblem' in msg:
del(msg['isProblem'])

# msg.updatePaths(self.o, msg['new_dir'], msg['new_file'])
# Need to update the relPath with new filename, because it's not an after_accept. new_dir and new_file don't exist.
parts = msg['relPath'].split('/')
parts[-1] = new_file
msg['relPath'] = '/'.join(parts)

logger.info(f"New filename: {msg['new_file']}")
logger.info(f"New filename: {new_file}")
new_worklist.append(msg)

except Exception as e:
Expand Down
47 changes: 22 additions & 25 deletions tests/sarracenia/flowcb/gather/am__gather_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,16 @@ def test_am_binary_bulletin():
bulletin, firstchars, lines, missing_ahl, station, charset = _get_bulletin_info(message_test1)

bulletinHeader = lines[0].decode('iso-8859-1').replace(' ', '_')
message_test1['new_file'] = bulletinHeader + '__12345'
message_test1['new_dir'] = BaseOptions.directory
message_test1['relPath'] = BaseOptions.directory + bulletinHeader + '__12345'
message_test1['content']['value'] = b64encode(message_test1['content']['value']).decode('ascii')
message_test1["isProblem"] = False

worklist = make_worklist()
worklist.incoming = [message_test1]

# Check renamer.
renamer.after_accept(worklist)
assert worklist.incoming[0]['new_file'] == 'ISAA41_CYWA_030000___00001'
renamer.after_gather(worklist)
assert worklist.incoming[0]['relPath'].split('/')[-1] == 'ISAA41_CYWA_030000___00001'


# Test 2: Check a regular CACN bulletin
Expand All @@ -132,8 +131,7 @@ def test_cacn_regular():
bulletin, firstchars, lines, missing_ahl, station, charset = _get_bulletin_info(message_test2)

bulletinHeader = lines[0].decode('iso-8859-1').replace(' ', '_')
message_test2['new_file'] = bulletinHeader + '__12345'
message_test2['new_dir'] = BaseOptions.directory
message_test2['relPath'] = BaseOptions.directory + bulletinHeader + '__12345'

# Check correcting the bulletin contents of a CACN
new_bulletin, isProblem = am_instance.correctContents(bulletin, firstchars, lines, missing_ahl, station, charset)
Expand All @@ -147,8 +145,8 @@ def test_cacn_regular():
worklist = make_worklist()
worklist.incoming = [message_test2]

renamer.after_accept(worklist)
assert worklist.incoming[0]['new_file'] == 'CACN00_CWAO_021600__WVO_00001'
renamer.after_gather(worklist)
assert worklist.incoming[0]['relPath'].split('/')[-1] == 'CACN00_CWAO_021600__WVO_00001'

# Test 3: Check an erronous CACN bulletin (missing timestamp in bulletin contents)
def test_cacn_erronous():
Expand Down Expand Up @@ -180,8 +178,8 @@ def test_cacn_erronous():
worklist.incoming = [message_test3]


renamer.after_accept(worklist)
assert re.match('CACN00_CWAO_......__WPK_00001_PROBLEM' , worklist.incoming[0]['new_file'])
renamer.after_gather(worklist)
assert re.match('CACN00_CWAO_......__WPK_00001_PROBLEM' , worklist.incoming[0]['relPath'].split('/')[-1])

# Test 4: Bulletin with double line separator after header (my-header\n\n)
def test_bulletin_double_linesep():
Expand Down Expand Up @@ -212,8 +210,8 @@ def test_bulletin_double_linesep():
worklist = make_worklist()
worklist.incoming = [message_test4]

renamer.after_accept(worklist)
assert message_test4['new_file'] == 'SXCN35_CWVR_021100___00001'
renamer.after_gather(worklist)
assert message_test4['relPath'].split('/')[-1] == 'SXCN35_CWVR_021100___00001'

# Test 5: Bulletin with invalid year in timestamp (Fix: https://github.com/MetPX/sarracenia/pull/973)
def test_bulletin_invalid_timestamp(caplog):
Expand All @@ -230,8 +228,8 @@ def test_bulletin_invalid_timestamp(caplog):
bulletin, firstchars, lines, missing_ahl, station, charset = _get_bulletin_info(message_test5)

bulletinHeader = lines[0].decode('iso-8859-1').replace(' ', '_')
message_test5['new_file'] = bulletinHeader + '__12345'
message_test5['new_dir'] = BaseOptions.directory
message_test5['relPath'].split('/')[-1] = bulletinHeader + '__12345'
message_test5['relPath'].split('/')[-2:] = BaseOptions.directory

new_bulletin, isProblem = am_instance.correctContents(bulletin, firstchars, lines, missing_ahl, station, charset)
assert new_bulletin == b'CACN00 CWAO\nWVO\n100,1024,123,1600,0,100,13.5,5.6,79.4,0.722,11.81,11.74,1.855,6.54,16.76,1544,2.344,14.26,0,375.6,375.6,375.5,375.5,0,11.58,11.24,3.709,13.89,13.16,11.22,11,9.45,11.39,5.033,79.4,0.694,-6999,41.19,5.967,5.887,5.93,6.184,5.64,5.066,5.253,-6999,7.3,0.058,0,5.715,4.569,0,0,1.942,-6999,57.4,0,0.531,-6999,1419,1604,1787,-6999,-6999,-6999,-6999,-6999,1601,-6999,-6999,6,5.921,5.956,6.177,5.643,5.07,5.256,-6999,9.53,11.22,10.09,10.61,125.4,9.1\n'
Expand All @@ -242,7 +240,7 @@ def test_bulletin_invalid_timestamp(caplog):
worklist = make_worklist()
worklist.incoming = [message_test5]

renamer.after_accept(worklist)
renamer.after_gather(worklist)
# We want to make sure the proper errors are raised from the logs
assert 'Unable to fetch header contents. Skipping message' in caplog.text and 'Unable to verify year from julian time.' in caplog.text

Expand Down Expand Up @@ -299,8 +297,8 @@ def test_bulletin_wrong_station():
worklist = make_worklist()
worklist.incoming = [message_test7]

renamer.after_accept(worklist)
assert message_test7['new_file'] == 'UECN99_CYCX_071200___00001_PROBLEM'
renamer.after_gather(worklist)
assert worklist.incoming[0]['relPath'].split('/')[-1] == 'UECN99_CYCX_071200___00001_PROBLEM'

# Test 8: SM Bulletin - Add station mapping + SM/SI bulletin accomodities
def test_SM_bulletin():
Expand Down Expand Up @@ -330,8 +328,8 @@ def test_SM_bulletin():
worklist = make_worklist()
worklist.incoming = [message_test8]

renamer.after_accept(worklist)
assert message_test8['new_file'] == 'SMCN06_CWAO_030000__71816_00001'
renamer.after_gather(worklist)
assert worklist.incoming[0]['relPath'].split('/')[-1] == 'SMCN06_CWAO_030000__71816_00001'

# Test 9: Bulletin with 5 fields in header (invalid)
def test_bulletin_header_five_fileds():
Expand All @@ -347,8 +345,7 @@ def test_bulletin_header_five_fileds():
bulletin, firstchars, lines, missing_ahl, station, charset = _get_bulletin_info(message_test9)

bulletinHeader = lines[0].decode('iso-8859-1').replace(' ', '_')
message_test9['new_file'] = bulletinHeader + '__12345'
message_test9['new_dir'] = BaseOptions.directory
message_test9['relPath'] = BaseOptions.directory + bulletinHeader + '__12345'

# Check correcting the bulletin contents of the bulletin
new_bulletin, isProblem = am_instance.correctContents(bulletin, firstchars, lines, missing_ahl, station, charset)
Expand Down Expand Up @@ -422,8 +419,8 @@ def test_random_bulletin_with_BBB():
worklist = make_worklist()
worklist.incoming = [message_test12]

renamer.after_accept(worklist)
assert message_test12['new_file'] == 'FXCN06_CYTR_230939_AAA__00001'
renamer.after_gather(worklist)
assert worklist.incoming[0]['relPath'].split('/')[-1] == 'FXCN06_CYTR_230939_AAA__00001'

# Test 13: SM Bulletin with BBB - Add station mapping + SM/SI bulletin accomodities + conserve BBB header
def test_SM_bulletin_with_BBB():
Expand Down Expand Up @@ -453,5 +450,5 @@ def test_SM_bulletin_with_BBB():
worklist = make_worklist()
worklist.incoming = [message_test13]

renamer.after_accept(worklist)
assert message_test13['new_file'] == 'SMCN06_CWAO_030000_AAA_71816_00001'
renamer.after_gather(worklist)
assert worklist.incoming[0]['relPath'].split('/')[-1] == 'SMCN06_CWAO_030000_AAA_71816_00001'

0 comments on commit a4af0e2

Please sign in to comment.