diff --git a/src/classes/worker.ts b/src/classes/worker.ts index acc6f1d533..f9a2ce34ce 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -1042,7 +1042,6 @@ will never work with more accuracy than 1ms. */ } clearTimeout(this.extendLocksTimer); - //clearTimeout(this.stalledCheckTimer); this.stalledCheckStopper?.(); this.closed = true; @@ -1249,6 +1248,7 @@ will never work with more accuracy than 1ms. */ this.emit('stalled', jobId, 'active'); }); + // Todo: check if there any listeners on failed event const jobPromises: Promise>[] = []; for (let i = 0; i < failed.length; i++) { jobPromises.push( diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 89d81bad90..a1093895fd 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -1406,6 +1406,255 @@ describe('Job Scheduler', function () { }); }); + describe('when repeatable job fails', function () { + it('should continue repeating', async function () { + const repeatOpts = { + pattern: '0 * 1 * *', + }; + + const worker = new Worker( + queueName, + async () => { + throw new Error('failed'); + }, + { + connection, + prefix, + }, + ); + + const failing = new Promise(resolve => { + worker.on('failed', () => { + resolve(); + }); + }); + + const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob!.promote(); + await failing; + + const failedCount = await queue.getFailedCount(); + expect(failedCount).to.be.equal(1); + + const delayedCount2 = await queue.getDelayedCount(); + expect(delayedCount2).to.be.equal(1); + + const jobSchedulers = await queue.getJobSchedulers(); + + const count = await queue.count(); + expect(count).to.be.equal(1); + expect(jobSchedulers).to.have.length(1); + await worker.close(); + }); + + it('should not create a new delayed job if the failed job is retried with retryJobs', async function () { + const repeatOpts = { + every: 579, + }; + + let isFirstRun = true; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(177); + if (isFirstRun) { + isFirstRun = false; + throw new Error('failed'); + } + }, + { + connection, + prefix, + }, + ); + + const failing = new Promise(resolve => { + worker.on('failed', async () => { + resolve(); + }); + }); + + const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob!.promote(); + await failing; + + const failedCount = await queue.getFailedCount(); + expect(failedCount).to.be.equal(1); + + // Retry the failed job + this.clock.tick(1143); + await queue.retryJobs({ state: 'failed' }); + const failedCountAfterRetry = await queue.getFailedCount(); + expect(failedCountAfterRetry).to.be.equal(0); + + const delayedCount2 = await queue.getDelayedCount(); + expect(delayedCount2).to.be.equal(1); + }); + + it('should not create a new delayed job if the failed job is retried with Job.retry()', async function () { + const repeatOpts = { + every: 477, + }; + + let isFirstRun = true; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(177); + + if (isFirstRun) { + isFirstRun = false; + throw new Error('failed'); + } + }, + { + connection, + prefix, + }, + ); + + const failing = new Promise(resolve => { + worker.on('failed', async () => { + resolve(); + }); + }); + + const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob!.promote(); + + this.clock.tick(177); + + await failing; + + this.clock.tick(177); + + const failedJobs = await queue.getFailed(); + expect(failedJobs.length).to.be.equal(1); + + // Retry the failed job + const failedJob = await queue.getJob(failedJobs[0].id); + await failedJob!.retry(); + const failedCountAfterRetry = await queue.getFailedCount(); + expect(failedCountAfterRetry).to.be.equal(0); + + const delayedCount2 = await queue.getDelayedCount(); + expect(delayedCount2).to.be.equal(1); + }); + + it('should not create a new delayed job if the failed job is stalled and moved back to wait', async function () { + // Note, this test is expected to throw an exception like this: + // "Error: Missing lock for job repeat:test:1486455840000. moveToFinished" + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + + const repeatOpts = { + every: 2000, + }; + + const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); + expect(repeatableJob).to.be.ok; + + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob!.promote(); + + let resolveCompleting: () => void; + const complettingJob = new Promise(resolve => { + resolveCompleting = resolve; + }); + + let worker: Worker; + const processing = new Promise(resolve => { + worker = new Worker( + queueName, + async () => { + resolve(); + return complettingJob; + }, + { + connection, + prefix, + skipLockRenewal: true, + skipStalledCheck: true, + }, + ); + }); + + await processing; + + // force remove the lock + const client = await queue.client; + const lockKey = `${prefix}:${queueName}:${repeatableJob!.id}:lock`; + await client.del(lockKey); + + const stalledCheckerKey = `${prefix}:${queueName}:stalled-check`; + await client.del(stalledCheckerKey); + + const scripts = (worker!).scripts; + let [failed, stalled] = await scripts.moveStalledJobsToWait(); + + await client.del(stalledCheckerKey); + + [failed, stalled] = await scripts.moveStalledJobsToWait(); + + const waitingJobs = await queue.getWaiting(); + expect(waitingJobs.length).to.be.equal(1); + + await this.clock.tick(500); + + resolveCompleting!(); + await worker!.close(); + + await this.clock.tick(500); + + const delayedCount2 = await queue.getDelayedCount(); + expect(delayedCount2).to.be.equal(1); + + let completedJobs = await queue.getCompleted(); + expect(completedJobs.length).to.be.equal(0); + + const processing2 = new Promise(resolve => { + worker = new Worker( + queueName, + async () => { + resolve(); + }, + { + connection, + prefix, + skipLockRenewal: true, + skipStalledCheck: true, + }, + ); + }); + + await processing2; + + await worker!.close(); + + completedJobs = await queue.getCompleted(); + expect(completedJobs.length).to.be.equal(1); + + const waitingJobs2 = await queue.getWaiting(); + expect(waitingJobs2.length).to.be.equal(0); + + const delayedCount3 = await queue.getDelayedCount(); + expect(delayedCount3).to.be.equal(1); + }); + }); + describe('when every option is provided', function () { it('should keep only one delayed job if adding a new repeatable job with the same id', async function () { const date = new Date('2017-02-07 9:24:00'); diff --git a/tests/test_queue.ts b/tests/test_queue.ts index 80bd41a92d..446e6121a1 100644 --- a/tests/test_queue.ts +++ b/tests/test_queue.ts @@ -37,6 +37,21 @@ describe('queues', function () { await connection.quit(); }); + describe('use generics', function () { + it('should be able to use generics', async function () { + const queue = new Queue<{ foo: string; bar: number }>(queueName, { + prefix, + connection, + }); + + const job = await queue.add(queueName, { foo: 'bar', bar: 1 }); + const job2 = await queue.getJob(job.id!); + expect(job2?.data.foo).to.be.eql('bar'); + expect(job2?.data.bar).to.be.eql(1); + await queue.close(); + }); + }); + it('should return the queue version', async () => { const queue = new Queue(queueName, { connection }); const version = await queue.getVersion();