diff --git a/src/commands/changePriority-6.lua b/src/commands/changePriority-6.lua index a63c5b3a9d..e89e0b0af0 100644 --- a/src/commands/changePriority-6.lua +++ b/src/commands/changePriority-6.lua @@ -23,29 +23,39 @@ local priority = tonumber(ARGV[1]) local rcall = redis.call -- Includes ---- @include "includes/isQueuePaused" +--- @include "includes/addJobInTargetList" --- @include "includes/addJobWithPriority" +--- @include "includes/getTargetQueueList" +--- @include "includes/pushBackJobWithPriority" + +local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey, + priorityCounter, lifo, priority, jobId, paused) + if priority == 0 then + local pushCmd = lifo and 'RPUSH' or 'LPUSH' + addJobInTargetList(targetKey, markerKey, pushCmd, paused, jobId) + else + if lifo then + pushBackJobWithPriority(prioritizedKey, priority, jobId) + else + addJobWithPriority(markerKey, prioritizedKey, priority, jobId, + priorityCounter, paused) + end + end +end if rcall("EXISTS", jobKey) == 1 then local metaKey = KEYS[3] - local isPaused = isQueuePaused(metaKey) + local target, isPaused = getTargetQueueList(metaKey, KEYS[1], KEYS[2]) local markerKey = KEYS[6] local prioritizedKey = KEYS[4] -- Re-add with the new priority if rcall("ZREM", KEYS[4], jobId) > 0 then - addJobWithPriority(markerKey, prioritizedKey, priority, jobId, KEYS[5], - isPaused) - -- If the new priority is 0, then just leave the job where it is in the wait list. - elseif priority > 0 then - -- Job is already in the wait list, we need to re-add it with the new priority. - local target = isPaused and KEYS[2] or KEYS[1] - - local numRemovedElements = rcall("LREM", target, -1, jobId) - if numRemovedElements > 0 then - addJobWithPriority(markerKey, prioritizedKey, priority, jobId, - KEYS[5], isPaused) - end + reAddJobWithNewPriority( prioritizedKey, markerKey, target, + KEYS[5], ARGV[4] == '1', priority, jobId, isPaused) + elseif rcall("LREM", target, -1, jobId) > 0 then + reAddJobWithNewPriority( prioritizedKey, markerKey, target, + KEYS[5], ARGV[4] == '1', priority, jobId, isPaused) end rcall("HSET", jobKey, "priority", priority) diff --git a/tests/test_job.ts b/tests/test_job.ts index fec64641eb..22e8423e62 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -985,46 +985,96 @@ describe('Job', function () { }); describe('.changePriority', () => { - it('can change priority of a job', async function () { - await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 }); - const job = await Job.create( - queue, - 'test2', - { foo: 'bar' }, - { priority: 16 }, - ); + describe('when job is in wait state', () => { + describe('when lifo option is provided as true', () => { + it('moves job to the head of wait list', async () => { + await queue.pause(); + await Job.create(queue, 'test1', { foo: 'bar' }); + const job = await Job.create( + queue, + 'test2', + { foo: 'bar' }, + { priority: 16 }, + ); - await job.changePriority({ - priority: 1, - }); + await job.changePriority({ + priority: 0, + lifo: true, + }); - const worker = new Worker( - queueName, - async () => { - await delay(20); - }, - { connection, prefix }, - ); - await worker.waitUntilReady(); + const worker = new Worker( + queueName, + async () => { + await delay(20); + }, + { connection, prefix }, + ); + await worker.waitUntilReady(); + + const completing = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + expect(job.name).to.be.eql('test1'); + resolve(); + }), + ); + }); - const completing = new Promise(resolve => { - worker.on( - 'completed', - after(2, job => { - expect(job.name).to.be.eql('test1'); - resolve(); - }), - ); + await queue.resume(); + + await completing; + + await worker.close(); + }); }); - await completing; + describe('when lifo option is provided as false', () => { + it('moves job to the tail of wait list and has more priority', async () => { + await queue.pause(); + const job = await Job.create( + queue, + 'test1', + { foo: 'bar' }, + { priority: 8 }, + ); + await Job.create(queue, 'test2', { foo: 'bar' }); - await worker.close(); + await job.changePriority({ + priority: 0, + lifo: false, + }); + + const worker = new Worker( + queueName, + async () => { + await delay(20); + }, + { connection, prefix }, + ); + await worker.waitUntilReady(); + + const completing = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + expect(job.name).to.be.eql('test1'); + resolve(); + }), + ); + }); + + await queue.resume(); + + await completing; + + await worker.close(); + }); + }); }); - describe('when queue is paused', () => { - it('respects new priority', async () => { - await queue.pause(); + describe('when job is in prioritized state', () => { + it('can change priority of a job', async function () { await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 }); const job = await Job.create( queue, @@ -1056,69 +1106,68 @@ describe('Job', function () { ); }); - await queue.resume(); - await completing; await worker.close(); }); - }); - describe('when lifo option is provided as true', () => { - it('moves job to the head of wait list', async () => { - await queue.pause(); - await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 }); - const job = await Job.create( - queue, - 'test2', - { foo: 'bar' }, - { priority: 16 }, - ); - - await job.changePriority({ - lifo: true, - }); + describe('when lifo option is provided as true', () => { + it('moves job to the head of prioritized jobs with same priority', async () => { + await queue.pause(); + await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 16 }); + const job = await Job.create( + queue, + 'test2', + { foo: 'bar' }, + { priority: 16 }, + ); - const worker = new Worker( - queueName, - async () => { - await delay(20); - }, - { connection, prefix }, - ); - await worker.waitUntilReady(); + await job.changePriority({ + priority: 16, + lifo: true, + }); - const completing = new Promise(resolve => { - worker.on( - 'completed', - after(2, job => { - expect(job.name).to.be.eql('test1'); - resolve(); - }), + const worker = new Worker( + queueName, + async () => { + await delay(20); + }, + { connection, prefix }, ); - }); + await worker.waitUntilReady(); + + const completing = new Promise(resolve => { + worker.on( + 'completed', + after(2, job => { + expect(job.name).to.be.eql('test1'); + resolve(); + }), + ); + }); - await queue.resume(); + await queue.resume(); - await completing; + await completing; - await worker.close(); + await worker.close(); + }); }); }); - describe('when lifo option is provided as false', () => { - it('moves job to the tail of wait list and has more priority', async () => { + describe('when queue is paused', () => { + it('respects new priority', async () => { await queue.pause(); + await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 }); const job = await Job.create( queue, - 'test1', + 'test2', { foo: 'bar' }, - { priority: 8 }, + { priority: 16 }, ); - await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 }); await job.changePriority({ - lifo: false, + priority: 1, }); const worker = new Worker( @@ -1134,7 +1183,7 @@ describe('Job', function () { worker.on( 'completed', after(2, job => { - expect(job.name).to.be.eql('test2'); + expect(job.name).to.be.eql('test1'); resolve(); }), ); @@ -1148,7 +1197,7 @@ describe('Job', function () { }); }); - describe('when job is not in wait state', () => { + describe('when job is not in wait or prioritized state', () => { it('does not add a record in priority zset', async () => { const job = await Job.create( queue,