From 58e43c3c191d008e17423abb4121c5ae09606e0f Mon Sep 17 00:00:00 2001 From: fgozdz Date: Thu, 28 Nov 2024 10:40:42 +0100 Subject: [PATCH 1/2] feat(job-scheduler): tests with drain and promote for job scheduler --- tests/test_job_scheduler.ts | 38 +++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index f0f8ad5e4d..f4f31fe563 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -1896,4 +1896,42 @@ describe('Job Scheduler', function () { await processing; await worker.close(); }); + + it('should schedule next repeatable job after promote', async function () { + const worker = new Worker(queueName, async () => {}, { connection }); + await worker.waitUntilReady(); + await queue.upsertJobScheduler('scheduler-test', { every: 50000 }); + + await queue.promoteJobs(); + + expect( + (await queue.getDelayedCount()) == 1 || + (await queue.getWaitingCount()) == 1 || + (await queue.getActiveCount()) == 1, + ).to.be.true; + await queue.removeJobScheduler('scheduler-test'); + await worker.close(); + }); + + it('worker should start processing repeatable jobs after drain', async function () { + await queue.upsertJobScheduler('scheduler-test', { + every: 50000, + immediately: true, + }); + const worker = new Worker(queueName, async () => {}, { connection }); + await worker.waitUntilReady(); + + await queue.drain(true); + + await queue.upsertJobScheduler('scheduler-test', { + every: 50000, + immediately: true, + }); + + expect(worker.isRunning()).to.be.true; + expect(await queue.getDelayedCount()).to.equal(1); + + await queue.removeJobScheduler('scheduler-test'); + await worker.close(); + }); }); From 86476b554c47db52fc976a014da8bf0038b9650f Mon Sep 17 00:00:00 2001 From: fgozdz Date: Fri, 29 Nov 2024 15:56:48 +0100 Subject: [PATCH 2/2] feat(job-scheduler): remove worker and catch for the completed state --- tests/test_job_scheduler.ts | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index f4f31fe563..bcdfbfdd2e 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -1898,24 +1898,17 @@ describe('Job Scheduler', function () { }); it('should schedule next repeatable job after promote', async function () { - const worker = new Worker(queueName, async () => {}, { connection }); - await worker.waitUntilReady(); await queue.upsertJobScheduler('scheduler-test', { every: 50000 }); await queue.promoteJobs(); - expect( - (await queue.getDelayedCount()) == 1 || - (await queue.getWaitingCount()) == 1 || - (await queue.getActiveCount()) == 1, - ).to.be.true; - await queue.removeJobScheduler('scheduler-test'); - await worker.close(); + const waitingCount = await queue.getWaitingCount(); + expect(waitingCount).to.be.equal(1); }); it('worker should start processing repeatable jobs after drain', async function () { await queue.upsertJobScheduler('scheduler-test', { - every: 50000, + pattern: '* * * * *', immediately: true, }); const worker = new Worker(queueName, async () => {}, { connection }); @@ -1924,14 +1917,28 @@ describe('Job Scheduler', function () { await queue.drain(true); await queue.upsertJobScheduler('scheduler-test', { - every: 50000, + pattern: '* * * * *', immediately: true, }); - expect(worker.isRunning()).to.be.true; - expect(await queue.getDelayedCount()).to.equal(1); + const completing = new Promise((resolve, reject) => { + worker.once('completed', async job => { + try { + expect(job).to.be.ok; + expect(job.data.foo).to.be.eql('bar'); + } catch (err) { + reject(err); + } + resolve(); + }); + }); + + const job = await queue.add('test', { foo: 'bar' }); + expect(job.id).to.be.ok; + expect(job.data.foo).to.be.eql('bar'); + + await completing; - await queue.removeJobScheduler('scheduler-test'); await worker.close(); }); });