diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 2400073427..743af75b72 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -37,7 +37,7 @@ export class JobScheduler extends QueueBase { jobName: N, jobData: T, opts: JobSchedulerTemplateOptions, - { override }: { override: boolean }, + { override, producerId }: { override: boolean; producerId?: string }, ): Promise | undefined> { const { every, pattern, offset } = repeatOpts; @@ -171,6 +171,7 @@ export class JobScheduler extends QueueBase { }, jobData, iterationCount, + producerId, ); const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][] @@ -206,6 +207,8 @@ export class JobScheduler extends QueueBase { opts: JobsOptions, data: T, currentCount: number, + // The job id of the job that produced this next iteration + producerId?: string, ) { // // Generate unique job id for this iteration. @@ -232,6 +235,11 @@ export class JobScheduler extends QueueBase { const job = new this.Job(this, name, data, mergedOpts, jobId); job.addJob(client); + if (producerId) { + const producerJobKey = this.toKey(producerId); + client.hset(producerJobKey, 'nrjid', job.id); + } + return job; } diff --git a/src/classes/job.ts b/src/classes/job.ts index 531839e62c..19d96d3921 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -168,6 +168,12 @@ export class Job< */ repeatJobKey?: string; + /** + * Produced next repetable job Id. + * + */ + nextRepeatableJobId?: string; + /** * The token used for locking this job. */ @@ -384,6 +390,10 @@ export class Job< job.processedBy = json.pb; } + if (json.nrjid) { + job.nextRepeatableJobId = json.nrjid; + } + return job; } @@ -493,6 +503,7 @@ export class Job< deduplicationId: this.deduplicationId, repeatJobKey: this.repeatJobKey, returnvalue: JSON.stringify(this.returnvalue), + nrjid: this.nextRepeatableJobId, }); } diff --git a/src/classes/worker.ts b/src/classes/worker.ts index f68811c589..b79ddf4648 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -788,7 +788,7 @@ will never work with more accuracy than 1ms. */ job.token = token; // Add next scheduled job if necessary. - if (job.opts.repeat) { + if (job.opts.repeat && !job.nextRepeatableJobId) { // Use new job scheduler if possible if (job.repeatJobKey) { const jobScheduler = await this.jobScheduler; @@ -798,7 +798,7 @@ will never work with more accuracy than 1ms. */ job.name, job.data, job.opts, - { override: false }, + { override: false, producerId: job.id }, ); } else { const repeat = await this.repeat; @@ -835,6 +835,8 @@ will never work with more accuracy than 1ms. */ }); const handleCompleted = async (result: ResultType) => { + jobsInProgress.delete(inProgressItem); + if (!this.connection.closing) { const completed = await job.moveToCompleted( result, @@ -855,6 +857,8 @@ will never work with more accuracy than 1ms. */ }; const handleFailed = async (err: Error) => { + jobsInProgress.delete(inProgressItem); + if (!this.connection.closing) { try { // Check if the job was manually rate-limited @@ -911,8 +915,6 @@ will never work with more accuracy than 1ms. */ [TelemetryAttributes.JobFinishedTimestamp]: Date.now(), [TelemetryAttributes.JobProcessedTimestamp]: processedOn, }); - - jobsInProgress.delete(inProgressItem); } }, srcPropagationMedatada, diff --git a/src/interfaces/job-json.ts b/src/interfaces/job-json.ts index 25ad335145..e27c302432 100644 --- a/src/interfaces/job-json.ts +++ b/src/interfaces/job-json.ts @@ -18,6 +18,7 @@ export interface JobJson { parent?: ParentKeys; parentKey?: string; repeatJobKey?: string; + nextRepeatableJobKey?: string; debounceId?: string; deduplicationId?: string; processedBy?: string; @@ -41,6 +42,7 @@ export interface JobJsonRaw { parent?: string; deid?: string; rjk?: string; + nrjid?: string; atm?: string; ats?: string; pb?: string; // Worker name diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 1b3781ae04..c2da779a83 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -1636,20 +1636,24 @@ describe('Job Scheduler', function () { let isFirstRun = true; - const worker = new Worker( - queueName, - async () => { - this.clock.tick(177); - if (isFirstRun) { - isFirstRun = false; - throw new Error('failed'); - } - }, - { - connection, - prefix, - }, - ); + let worker; + const processingAfterFailing = new Promise(resolve => { + worker = new Worker( + queueName, + async () => { + this.clock.tick(177); + if (isFirstRun) { + isFirstRun = false; + throw new Error('failed'); + } + resolve(); + }, + { + connection, + prefix, + }, + ); + }); const failing = new Promise(resolve => { worker.on('failed', async () => { @@ -1658,26 +1662,40 @@ describe('Job Scheduler', function () { }); const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); - const delayedCount = await queue.getDelayedCount(); - expect(delayedCount).to.be.equal(1); await repeatableJob!.promote(); + + const delayedCountBeforeFailing = await queue.getDelayedCount(); + expect(delayedCountBeforeFailing).to.be.equal(0); + await failing; const failedCount = await queue.getFailedCount(); expect(failedCount).to.be.equal(1); + const delayedCountAfterFailing = await queue.getDelayedCount(); + expect(delayedCountAfterFailing).to.be.equal(1); + // Retry the failed job this.clock.tick(1143); await queue.retryJobs({ state: 'failed' }); const failedCountAfterRetry = await queue.getFailedCount(); expect(failedCountAfterRetry).to.be.equal(0); + await processingAfterFailing; + + await worker.close(); + const delayedCount2 = await queue.getDelayedCount(); expect(delayedCount2).to.be.equal(1); + + const waitingCount = await queue.getWaitingCount(); + expect(waitingCount).to.be.equal(0); }); it('should not create a new delayed job if the failed job is retried with Job.retry()', async function () { + let expectError; + const date = new Date('2017-02-07 9:24:00'); this.clock.setSystemTime(date); @@ -1692,6 +1710,13 @@ describe('Job Scheduler', function () { async () => { this.clock.tick(177); + try { + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + } catch (error) { + expectError = error; + } + if (isFirstRun) { isFirstRun = false; throw new Error('failed'); @@ -1731,6 +1756,14 @@ describe('Job Scheduler', function () { const failedCountAfterRetry = await queue.getFailedCount(); expect(failedCountAfterRetry).to.be.equal(0); + this.clock.tick(177); + + await worker.close(); + + if (expectError) { + throw expectError; + } + const delayedCount2 = await queue.getDelayedCount(); expect(delayedCount2).to.be.equal(1); });