From fe478b6e2affebfb64572b3bd555cd80e44dbc9d Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 20 May 2024 23:04:05 -0500 Subject: [PATCH 1/4] fix(scripts): throw error when move to delayed a non-active job --- lib/commands/includes/removeLock.lua | 19 +++++++++++++++++++ lib/commands/moveToDelayed-4.lua | 19 +++++++++---------- lib/job.js | 2 +- lib/scripts.js | 8 ++++++-- 4 files changed, 35 insertions(+), 13 deletions(-) create mode 100644 lib/commands/includes/removeLock.lua diff --git a/lib/commands/includes/removeLock.lua b/lib/commands/includes/removeLock.lua new file mode 100644 index 000000000..d7335f635 --- /dev/null +++ b/lib/commands/includes/removeLock.lua @@ -0,0 +1,19 @@ +local function removeLock(jobKey, stalledKey, token, jobId) + if token ~= "0" then + local lockKey = jobKey .. ':lock' + local lockToken = rcall("GET", lockKey) + if lockToken == token then + rcall("DEL", lockKey) + rcall("SREM", stalledKey, jobId) + else + if lockToken then + -- Lock exists but token does not match + return -6 + else + -- Lock is missing completely + return -2 + end + end + end + return 0 +end diff --git a/lib/commands/moveToDelayed-4.lua b/lib/commands/moveToDelayed-4.lua index bc8e09b51..f7c57d99e 100644 --- a/lib/commands/moveToDelayed-4.lua +++ b/lib/commands/moveToDelayed-4.lua @@ -21,22 +21,21 @@ ]] local rcall = redis.call +-- Includes +--- @include "includes/removeLock" + if rcall("EXISTS", KEYS[3]) == 1 then - -- Check for job lock - if ARGV[3] ~= "0" then - local lockKey = KEYS[3] .. ':lock' - if rcall("GET", lockKey) == ARGV[3] then - rcall("DEL", lockKey) - rcall("SREM", KEYS[4], ARGV[2]) - else - return -2 - end + local errorCode = removeLock(KEYS[3], KEYS[4], ARGV[3], ARGV[2]) + if errorCode < 0 then + return errorCode end + local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[2]) + if numRemovedElements < 1 then return -3 end + local score = tonumber(ARGV[1]) rcall("ZADD", KEYS[2], score, ARGV[2]) rcall("PUBLISH", KEYS[2], (score / 0x1000)) - rcall("LREM", KEYS[1], 0, ARGV[2]) return 0 else diff --git a/lib/job.js b/lib/job.js index 8bf21075a..18b7e03dc 100644 --- a/lib/job.js +++ b/lib/job.js @@ -339,7 +339,7 @@ Job.prototype.moveToFailed = async function(err, ignoreLock) { const results = await multi.exec(); const code = _.last(results)[1]; if (code < 0) { - throw scripts.finishedErrors(code, this.id, command); + throw scripts.finishedErrors(code, this.id, command, 'active'); } }; diff --git a/lib/scripts.js b/lib/scripts.js index 714a62cd5..3436626f8 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -238,7 +238,7 @@ const scripts = { ); return job.queue.client.moveToFinished(args).then(result => { if (result < 0) { - throw scripts.finishedErrors(result, job.id, 'finished'); + throw scripts.finishedErrors(result, job.id, 'finished', 'active'); } else if (result) { return raw2jobData(result); } @@ -246,12 +246,16 @@ const scripts = { }); }, - finishedErrors(code, jobId, command) { + finishedErrors(code, jobId, command, state) { switch (code) { case -1: return new Error('Missing key for job ' + jobId + ' ' + command); case -2: return new Error('Missing lock for job ' + jobId + ' ' + command); + case -3: + return new Error(`Job ${jobId} is not in the ${state} state. ${command}`); + case -6: + return new Error(`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`); } }, From 0c9b99d24483ca3648563f3b13979a8cddcaf5bd Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 20 May 2024 23:45:12 -0500 Subject: [PATCH 2/4] test: fix test cases --- test/test_job.js | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/test/test_job.js b/test/test_job.js index ec538fd9e..78c240dae 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -734,9 +734,11 @@ describe('Job', () => { expect(isFailed).to.be(false); expect(job.stacktrace).not.be(null); expect(job.stacktrace.length).to.be(1); - return job.isDelayed().then(isDelayed => { - expect(isDelayed).to.be(true); - }); + return client.lpush(queue.toKey('active'), job.id).then(()=>{ + return job.isDelayed().then(isDelayed => { + expect(isDelayed).to.be(true); + }); + }) }); }); }); @@ -893,7 +895,9 @@ describe('Job', () => { }) .then(state => { expect(state).to.be('completed'); - return client.zrem(queue.toKey('completed'), job.id); + return client.zrem(queue.toKey('completed'), job.id).then(()=>{ + return client.lpush(queue.toKey('active'), job.id) + }); }) .then(() => { return job.moveToDelayed(Date.now() + 10000, true); @@ -907,7 +911,9 @@ describe('Job', () => { }) .then(state => { expect(state).to.be('delayed'); - return client.zrem(queue.toKey('delayed'), job.id); + return client.zrem(queue.toKey('delayed'), job.id).then(()=>{ + return client.lpush(queue.toKey('active'), job.id) + }); }) .then(() => { return job.moveToFailed(new Error('test'), true); From 82c3cf00ac7bda151221175c9b1d1cd75ddb92c2 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 20 May 2024 23:54:15 -0500 Subject: [PATCH 3/4] test: fix test case --- test/test_job.js | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/test/test_job.js b/test/test_job.js index 78c240dae..1613ba789 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -726,6 +726,9 @@ describe('Job', () => { .then(isFailed => { expect(isFailed).to.be(false); }) + .then(() => { + return scripts.moveToActive(queue); + }) .then(() => { return job.moveToFailed(new Error('test error'), true); }) @@ -734,11 +737,9 @@ describe('Job', () => { expect(isFailed).to.be(false); expect(job.stacktrace).not.be(null); expect(job.stacktrace.length).to.be(1); - return client.lpush(queue.toKey('active'), job.id).then(()=>{ - return job.isDelayed().then(isDelayed => { - expect(isDelayed).to.be(true); - }); - }) + return job.isDelayed().then(isDelayed => { + expect(isDelayed).to.be(true); + }); }); }); }); From 4933d6e86927c57c166eb700e9363b8800c38792 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 20 May 2024 23:55:07 -0500 Subject: [PATCH 4/4] chore: remove spaces --- test/test_job.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_job.js b/test/test_job.js index 1613ba789..76cb729dc 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -739,7 +739,7 @@ describe('Job', () => { expect(job.stacktrace.length).to.be(1); return job.isDelayed().then(isDelayed => { expect(isDelayed).to.be(true); - }); + }); }); }); });