Skip to content

Commit

Permalink
fix: job finish queue events race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
airhorns authored Feb 1, 2021
1 parent 78a16a1 commit 355bca5
Showing 1 changed file with 42 additions and 37 deletions.
79 changes: 42 additions & 37 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,47 +360,52 @@ export class Job<T = any, R = any, N extends string = string> {
await this.queue.waitUntilReady();

const jobId = this.id;
const status = await Scripts.isFinished(this.queue, jobId);
const finished = status > 0;
if (finished) {
const job = await Job.fromId(this.queue, this.id);
if (status == 2) {
throw new Error(job.failedReason);
} else {
return job.returnvalue;
return new Promise<any>(async (resolve, reject) => {
let timeout: NodeJS.Timeout;
if (ttl) {
timeout = setTimeout(() => onFailed('timedout'), ttl);
}
} else {
return new Promise((resolve, reject) => {
let timeout: NodeJS.Timeout;
if (ttl) {
timeout = setTimeout(() => onFailed('timedout'), ttl);
}

function onCompleted(args: any) {
removeListeners();
resolve(args.returnvalue);
}

function onFailed(args: any) {
removeListeners();
reject(new Error(args.failedReason || args));
}

const completedEvent = `completed:${jobId}`;
const failedEvent = `failed:${jobId}`;
function onCompleted(args: any) {
removeListeners();
resolve(args.returnvalue);
}

queueEvents.on(completedEvent, onCompleted);
queueEvents.on(failedEvent, onFailed);
this.queue.on('closing', onFailed);
function onFailed(args: any) {
removeListeners();
reject(new Error(args.failedReason || args));
}

const removeListeners = () => {
clearInterval(timeout);
queueEvents.removeListener(completedEvent, onCompleted);
queueEvents.removeListener(failedEvent, onFailed);
this.queue.removeListener('closing', onFailed);
};
});
}
const completedEvent = `completed:${jobId}`;
const failedEvent = `failed:${jobId}`;

queueEvents.on(completedEvent, onCompleted);
queueEvents.on(failedEvent, onFailed);
this.queue.on('closing', onFailed);

const removeListeners = () => {
clearInterval(timeout);
queueEvents.removeListener(completedEvent, onCompleted);
queueEvents.removeListener(failedEvent, onFailed);
this.queue.removeListener('closing', onFailed);
};

// Poll once right now to see if the job has already finished. The job may have been completed before we were able
// to register the event handlers on the QueueEvents, so we check here to make sure we're not waiting for an event
// that has already happened. We block checking the job until the queue events object is actually listening to
// Redis so there's no chance that it will miss events.
await queueEvents.waitUntilReady();
const status = await Scripts.isFinished(this.queue, jobId);
const finished = status > 0;
if (finished) {
const job = await Job.fromId(this.queue, this.id);
if (status == 2) {
onFailed(job);
} else {
onCompleted(job);
}
}
});
}

moveToDelayed(timestamp: number) {
Expand Down

0 comments on commit 355bca5

Please sign in to comment.