From c3e7c8b26d41d557e0a9712e45e5c4fdd4938c85 Mon Sep 17 00:00:00 2001 From: Reid Sunderland Date: Fri, 20 Dec 2024 17:41:48 +0000 Subject: [PATCH 1/2] Fix DiskQueue bugs fix #1354 --- sarracenia/diskqueue.py | 54 ++++++------ tests/sarracenia/diskqueue_test.py | 129 +++++++++++++++++++++++++++-- 2 files changed, 149 insertions(+), 34 deletions(-) diff --git a/sarracenia/diskqueue.py b/sarracenia/diskqueue.py index 8ce7cc6a6..342e25c51 100755 --- a/sarracenia/diskqueue.py +++ b/sarracenia/diskqueue.py @@ -195,6 +195,7 @@ def _count_msgs(self, file_path) -> int: for line in f: if "{" in line: count +=1 + logger.debug(f"counted {count} msgs in {file_path}") return count @@ -231,35 +232,29 @@ def get(self, maximum_messages_to_get=1): if no message (and new or state file there) we wait for housekeeping to present retry messages """ - - if self.msg_count < 0: - return [] - elif self.msg_count == 0: - try: - os.unlink(self.queue_file) - self.queue_fp.close() - except Exception as ex: - pass - - self.queue_fp=None - self.msg_count=-1 + if self.msg_count == 0 and self.queue_fp is None: return [] ml = [] count = 0 - - # if the retry queue is empty, no sense looping. - mx = self.msg_count if self.msg_count < maximum_messages_to_get else maximum_messages_to_get - - while count < mx: + while count < maximum_messages_to_get: self.queue_fp, message = self.msg_get_from_file( self.queue_fp, self.queue_file) + # FIXME MG as discussed with Peter + # no housekeeping in get ... + # if no message (and new or state file there) + # we wait for housekeeping to present retry messages if not message: - self.msg_count=0 - return + try: + os.unlink(self.queue_file) + except: + pass + self.queue_fp = None + self.msg_count = 0 + #logger.debug("MG DEBUG retry get return None") + break - count += 1 if self.is_expired(message): #logger.error("MG invalid %s" % message) continue @@ -269,9 +264,18 @@ def get(self, maximum_messages_to_get=1): message['_deleteOnPost'].remove('ack_id') ml.append(message) + count += 1 self.msg_count -= count + # after getting the last message from the file, close it + if self.msg_count == 0: + try: + os.unlink(self.queue_file) + except: + pass + self.queue_fp = None + return ml def in_cache(self, message) -> bool: @@ -291,7 +295,7 @@ def in_cache(self, message) -> bool: elif 'pubTime' in message: sumstr = jsonpickle.encode(message['pubTime']) else: - logger.info('no key found for message, cannot add') + logger.warning('no key found for message, cannot add') return False cache_key = urlstr + ' ' + sumstr @@ -374,14 +378,12 @@ def on_housekeeping(self): remove .new rename housekeeping to queue for next period. """ - logger.debug("%s on_housekeeping" % self.name) + logger.debug(f"{self.name} on_housekeeping, {self.msg_count} msgs in queue file, {self.msg_count_new} in new file") # finish retry before reshuffling all retries entries - if os.path.isfile(self.queue_file) and self.queue_fp != None: - logger.info( - "have not finished retry list. Resuming retries with %s" % - self.queue_file) + if (os.path.isfile(self.queue_file) and self.queue_fp != None) or self.msg_count != 0: + logger.info(f"still {self.msg_count} messages in {self.name} list. Resuming retries with {self.queue_file}") return self.now = sarracenia.nowflt() diff --git a/tests/sarracenia/diskqueue_test.py b/tests/sarracenia/diskqueue_test.py index 6b9ca7c8b..077834b9d 100644 --- a/tests/sarracenia/diskqueue_test.py +++ b/tests/sarracenia/diskqueue_test.py @@ -240,25 +240,35 @@ def test_on_housekeeping__FinishRetry(tmp_path, caplog): BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" download_retry = DiskQueue(BaseOptions, 'test_on_housekeeping__FinishRetry') - message = make_message() - - download_retry.queue_fp = open(download_retry.queue_file, 'a') - line = jsonpickle.encode(message) + '\n' - download_retry.queue_fp.write(line + line) - download_retry.queue_fp.flush() - hk_out = download_retry.on_housekeeping() assert hk_out == None + # This should not be logged unless there is actually messages in the queue log_found_notFinished = False + for record in caplog.records: + if "Resuming retries" in record.message: + log_found_notFinished = True + + assert log_found_notFinished == False + m1 = make_message() + download_retry.put([m1]) + + # put message into Queue from new + download_retry.on_housekeeping() + + # run housekeeping again and now it should say it's not done + download_retry.on_housekeeping() + # This should not be logged unless there is actually messages in the queue + log_found_notFinished = False for record in caplog.records: - if "have not finished retry list" in record.message: + if "Resuming retries" in record.message: log_found_notFinished = True assert log_found_notFinished == True + def test_on_housekeeping(tmp_path, caplog): BaseOptions = Options() BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" @@ -290,3 +300,106 @@ def test_on_housekeeping(tmp_path, caplog): assert log_found_HasQueue == True assert log_found_NumMessages == True assert log_found_Elapsed == True + +def test_diskqueue(tmp_path, caplog): + """ DiskQueue integration test, tests the behaviour of the class, mimicking how it's actually used in sr3. + """ + BaseOptions = Options() + BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt" + dq = DiskQueue(BaseOptions, 'DiskQueue_Integration') + m1 = make_message() + m2 = make_message() + m2['pubTime'] = "20200118151049.356378078" + m3 = make_message() + m3['pubTime'] = "20240118151049.356378078" + + dq.put([m1]) + + assert len(dq) == 1 + assert dq.msg_count_new == 1 + assert dq.msg_count == 0 + + dq.put([m2, m3]) + assert len(dq) == 3 + assert dq.msg_count_new == 3 + assert dq.msg_count == 0 + + # should not be possible to get a message until after housekeeping + got = dq.get(2) + assert len(got) == 0 + assert len(dq) == 3 + assert dq.msg_count_new == 3 + assert dq.msg_count == 0 + + # now all messages should be moved from new file to normal file + dq.on_housekeeping() + assert len(dq) == 3 + assert dq.msg_count_new == 0 + assert dq.msg_count == 3 + + # now we can get + got = dq.get(2) + assert len(got) == 2 + assert len(dq) == 1 + assert dq.msg_count_new == 0 + assert dq.msg_count == 1 + + # try running housekeeping again + dq.on_housekeeping() + assert len(dq) == 1 + assert dq.msg_count_new == 0 + assert dq.msg_count == 1 + + log_found_resuming_retries = False + for record in caplog.records: + if "Resuming retries" in record.message: + log_found_resuming_retries = True + assert log_found_resuming_retries + + # add messages back + dq.put([m1, m2]) + assert len(dq) == 3 + assert dq.msg_count_new == 2 + assert dq.msg_count == 1 + + dq.on_housekeeping() + assert len(dq) == 3 + assert dq.msg_count_new == 2 + assert dq.msg_count == 1 + + log_found_resuming_retries = False + for record in caplog.records: + if "Resuming retries" in record.message: + log_found_resuming_retries = True + assert log_found_resuming_retries + + # get 1, now the queue is empty + got = dq.get() + assert len(got) == 1 + assert len(dq) == 2 + assert dq.msg_count_new == 2 + assert dq.msg_count == 0 + + # now housekeeping can move new msgs to regular file + dq.on_housekeeping() + assert len(dq) == 2 + assert dq.msg_count_new == 0 + assert dq.msg_count == 2 + + # add message back before closing, 1 in new, 2 in regular + dq.put([m1]) + assert len(dq) == 3 + assert dq.msg_count_new == 1 + assert dq.msg_count == 2 + + # close and re-open, messages in both new and regular file + dq.close() + dq = DiskQueue(BaseOptions, 'DiskQueue_Integration') + assert len(dq) == 3 + assert dq.msg_count_new == 1 + assert dq.msg_count == 2 + + + + + From 727bbe472e9508e15c0ab84eadfd065ab9e2d8ee Mon Sep 17 00:00:00 2001 From: Reid Sunderland Date: Fri, 20 Dec 2024 19:48:56 +0000 Subject: [PATCH 2/2] change github actions unit tests to u24.04 to try to workaround openssl incompatibility --- .github/workflows/unit-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 29057e35e..903728ef9 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -19,7 +19,7 @@ permissions: jobs: build: - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 name: Unit test timeout-minutes: 10