Skip to content

Commit

Permalink
fix(queue): differentiate score purpose per state in clean method (#2133
Browse files Browse the repository at this point in the history
) fixes #2124
  • Loading branch information
roggervalf authored Sep 20, 2023
1 parent 3fd185c commit 862f10b
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 13 deletions.
12 changes: 9 additions & 3 deletions src/commands/cleanJobsInSet-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@ local result
if ARGV[4] == "active" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false)
elseif ARGV[4] == "delayed" then
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"processedOn", "timestamp"})
rangeEnd = "+inf"
result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
{"processedOn", "timestamp"}, false)
elseif ARGV[4] == "prioritized" then
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"timestamp"})
rangeEnd = "+inf"
result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
{"timestamp"}, false)
elseif ARGV[4] == "wait" or ARGV[4] == "paused" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true)
else
result = cleanSet(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], limit, {"finishedOn"} )
rangeEnd = ARGV[2]
result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
{"finishedOn"}, true)
end

rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2])
Expand Down
2 changes: 1 addition & 1 deletion src/commands/includes/cleanList.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd,
-- Fetch all three of these (in that order) and use the first one that is set so that we'll leave jobs
-- that have been active within the grace period:
jobTS = getTimestamp(jobKey, {"finishedOn", "processedOn", "timestamp"})
if (not jobTS or jobTS < timestamp) then
if (not jobTS or jobTS <= timestamp) then
-- replace the entry with a deletion marker; the actual deletion will
-- occur at the end of the script
rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker)
Expand Down
16 changes: 11 additions & 5 deletions src/commands/includes/cleanSet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
--- @include "getTimestamp"
--- @include "removeJob"

local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, limit, attributes)
local jobs = getJobsInZset(setKey, rangeStart, rangeEnd, timestamp, limit)
local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attributes, isFinished)
local jobs = getJobsInZset(setKey, rangeEnd, limit)
local deleted = {}
local deletedCount = 0
local jobTS
Expand All @@ -20,12 +20,18 @@ local function cleanSet(setKey, jobKeyPrefix, rangeStart, rangeEnd, timestamp, l
end

local jobKey = jobKeyPrefix .. job
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
jobTS = getTimestamp(jobKey, attributes)
if (not jobTS or jobTS < timestamp) then
if isFinished then
removeJob(job, true, jobKeyPrefix)
deletedCount = deletedCount + 1
table.insert(deleted, job)
else
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
jobTS = getTimestamp(jobKey, attributes)
if (not jobTS or jobTS <= timestamp) then
removeJob(job, true, jobKeyPrefix)
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
end
end

Expand Down
7 changes: 3 additions & 4 deletions src/commands/includes/getJobsInZset.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
-- of items in a sorted set only run a single iteration. If we simply used
-- ZRANGE, we may take a long time traversing through jobs that are within the
-- grace period.
local function getJobsInZset(zsetKey, rangeStart, rangeEnd, maxTimestamp, limit)
local function getJobsInZset(zsetKey, rangeEnd, limit)
if limit > 0 then
return rcall("ZRANGEBYSCORE", zsetKey, 0, maxTimestamp, "LIMIT", 0, limit)
return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd, "LIMIT", 0, limit)
else
return rcall("ZRANGE", zsetKey, rangeStart, rangeEnd)
return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd)
end
end

20 changes: 20 additions & 0 deletions tests/test_clean.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,26 @@ describe('Cleaner', () => {
expect(count).to.be.eql(0);
});

it('should clean all delayed jobs when limit is given', async () => {
await queue.add('test', { some: 'data' }, { delay: 5000 });
await queue.add('test', { some: 'data' }, { delay: 5000 });
await delay(100);
const jobs = await queue.clean(0, 1000, 'delayed');
expect(jobs.length).to.be.eql(2);
const count = await queue.count();
expect(count).to.be.eql(0);
});

it('should clean all prioritized jobs when limit is given', async () => {
await queue.add('test', { some: 'data' }, { priority: 5000 });
await queue.add('test', { some: 'data' }, { priority: 5001 });
await delay(100);
const jobs = await queue.clean(0, 1000, 'prioritized');
expect(jobs.length).to.be.eql(2);
const count = await queue.count();
expect(count).to.be.eql(0);
});

describe('when prioritized state is provided', async () => {
it('should clean the number of jobs requested', async () => {
await queue.add('test', { some: 'data' }, { priority: 1 }); // as queue is empty, this job will be added to wait
Expand Down

0 comments on commit 862f10b

Please sign in to comment.