Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DiskQueue bugs #1356

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ permissions:

jobs:
build:
runs-on: ubuntu-22.04
runs-on: ubuntu-24.04

name: Unit test
timeout-minutes: 10
Expand Down
54 changes: 28 additions & 26 deletions sarracenia/diskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def _count_msgs(self, file_path) -> int:
for line in f:
if "{" in line:
count +=1
logger.debug(f"counted {count} msgs in {file_path}")

return count

Expand Down Expand Up @@ -231,35 +232,29 @@ def get(self, maximum_messages_to_get=1):
if no message (and new or state file there)
we wait for housekeeping to present retry messages
"""

if self.msg_count < 0:
return []
elif self.msg_count == 0:
try:
os.unlink(self.queue_file)
self.queue_fp.close()
except Exception as ex:
pass

self.queue_fp=None
self.msg_count=-1
if self.msg_count == 0 and self.queue_fp is None:
return []

ml = []
count = 0

# if the retry queue is empty, no sense looping.
mx = self.msg_count if self.msg_count < maximum_messages_to_get else maximum_messages_to_get

while count < mx:
while count < maximum_messages_to_get:
self.queue_fp, message = self.msg_get_from_file(
self.queue_fp, self.queue_file)

# FIXME MG as discussed with Peter
# no housekeeping in get ...
# if no message (and new or state file there)
# we wait for housekeeping to present retry messages
if not message:
self.msg_count=0
return
try:
os.unlink(self.queue_file)
except:
pass
self.queue_fp = None
self.msg_count = 0
#logger.debug("MG DEBUG retry get return None")
break

count += 1
if self.is_expired(message):
#logger.error("MG invalid %s" % message)
continue
Expand All @@ -269,9 +264,18 @@ def get(self, maximum_messages_to_get=1):
message['_deleteOnPost'].remove('ack_id')

ml.append(message)
count += 1

self.msg_count -= count

# after getting the last message from the file, close it
if self.msg_count == 0:
try:
os.unlink(self.queue_file)
except:
pass
self.queue_fp = None

return ml

def in_cache(self, message) -> bool:
Expand All @@ -291,7 +295,7 @@ def in_cache(self, message) -> bool:
elif 'pubTime' in message:
sumstr = jsonpickle.encode(message['pubTime'])
else:
logger.info('no key found for message, cannot add')
logger.warning('no key found for message, cannot add')
return False

cache_key = urlstr + ' ' + sumstr
Expand Down Expand Up @@ -374,14 +378,12 @@ def on_housekeeping(self):
remove .new
rename housekeeping to queue for next period.
"""
logger.debug("%s on_housekeeping" % self.name)
logger.debug(f"{self.name} on_housekeeping, {self.msg_count} msgs in queue file, {self.msg_count_new} in new file")

# finish retry before reshuffling all retries entries

if os.path.isfile(self.queue_file) and self.queue_fp != None:
logger.info(
"have not finished retry list. Resuming retries with %s" %
self.queue_file)
if (os.path.isfile(self.queue_file) and self.queue_fp != None) or self.msg_count != 0:
logger.info(f"still {self.msg_count} messages in {self.name} list. Resuming retries with {self.queue_file}")
return

self.now = sarracenia.nowflt()
Expand Down
129 changes: 121 additions & 8 deletions tests/sarracenia/diskqueue_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,25 +240,35 @@ def test_on_housekeeping__FinishRetry(tmp_path, caplog):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
download_retry = DiskQueue(BaseOptions, 'test_on_housekeeping__FinishRetry')

message = make_message()

download_retry.queue_fp = open(download_retry.queue_file, 'a')
line = jsonpickle.encode(message) + '\n'
download_retry.queue_fp.write(line + line)
download_retry.queue_fp.flush()

hk_out = download_retry.on_housekeeping()

assert hk_out == None

# This should not be logged unless there is actually messages in the queue
log_found_notFinished = False
for record in caplog.records:
if "Resuming retries" in record.message:
log_found_notFinished = True

assert log_found_notFinished == False

m1 = make_message()
download_retry.put([m1])

# put message into Queue from new
download_retry.on_housekeeping()

# run housekeeping again and now it should say it's not done
download_retry.on_housekeeping()
# This should not be logged unless there is actually messages in the queue
log_found_notFinished = False
for record in caplog.records:
if "have not finished retry list" in record.message:
if "Resuming retries" in record.message:
log_found_notFinished = True

assert log_found_notFinished == True


def test_on_housekeeping(tmp_path, caplog):
BaseOptions = Options()
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
Expand Down Expand Up @@ -290,3 +300,106 @@ def test_on_housekeeping(tmp_path, caplog):
assert log_found_HasQueue == True
assert log_found_NumMessages == True
assert log_found_Elapsed == True

def test_diskqueue(tmp_path, caplog):
""" DiskQueue integration test, tests the behaviour of the class, mimicking how it's actually used in sr3.
"""
BaseOptions = Options()
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
dq = DiskQueue(BaseOptions, 'DiskQueue_Integration')
m1 = make_message()
m2 = make_message()
m2['pubTime'] = "20200118151049.356378078"
m3 = make_message()
m3['pubTime'] = "20240118151049.356378078"

dq.put([m1])

assert len(dq) == 1
assert dq.msg_count_new == 1
assert dq.msg_count == 0

dq.put([m2, m3])
assert len(dq) == 3
assert dq.msg_count_new == 3
assert dq.msg_count == 0

# should not be possible to get a message until after housekeeping
got = dq.get(2)
assert len(got) == 0
assert len(dq) == 3
assert dq.msg_count_new == 3
assert dq.msg_count == 0

# now all messages should be moved from new file to normal file
dq.on_housekeeping()
assert len(dq) == 3
assert dq.msg_count_new == 0
assert dq.msg_count == 3

# now we can get
got = dq.get(2)
assert len(got) == 2
assert len(dq) == 1
assert dq.msg_count_new == 0
assert dq.msg_count == 1

# try running housekeeping again
dq.on_housekeeping()
assert len(dq) == 1
assert dq.msg_count_new == 0
assert dq.msg_count == 1

log_found_resuming_retries = False
for record in caplog.records:
if "Resuming retries" in record.message:
log_found_resuming_retries = True
assert log_found_resuming_retries

# add messages back
dq.put([m1, m2])
assert len(dq) == 3
assert dq.msg_count_new == 2
assert dq.msg_count == 1

dq.on_housekeeping()
assert len(dq) == 3
assert dq.msg_count_new == 2
assert dq.msg_count == 1

log_found_resuming_retries = False
for record in caplog.records:
if "Resuming retries" in record.message:
log_found_resuming_retries = True
assert log_found_resuming_retries

# get 1, now the queue is empty
got = dq.get()
assert len(got) == 1
assert len(dq) == 2
assert dq.msg_count_new == 2
assert dq.msg_count == 0

# now housekeeping can move new msgs to regular file
dq.on_housekeeping()
assert len(dq) == 2
assert dq.msg_count_new == 0
assert dq.msg_count == 2

# add message back before closing, 1 in new, 2 in regular
dq.put([m1])
assert len(dq) == 3
assert dq.msg_count_new == 1
assert dq.msg_count == 2

# close and re-open, messages in both new and regular file
dq.close()
dq = DiskQueue(BaseOptions, 'DiskQueue_Integration')
assert len(dq) == 3
assert dq.msg_count_new == 1
assert dq.msg_count == 2





Loading