From e42bfd2814fc5136b175470c3085355090cc2e01 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Wed, 30 Dec 2020 14:50:43 +0100 Subject: [PATCH] feat: add support for manually processing jobs fixes #327 --- src/classes/worker.ts | 46 +++++++++++++++++++++-------------------- src/test/test_worker.ts | 44 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 22 deletions(-) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 7555787dc8..b2067ec1d1 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -39,7 +39,7 @@ export class Worker< private processing: Map | string>, string>; // { [index: number]: Promise } = {}; constructor( name: string, - processor: string | Processor, + processor?: string | Processor, opts: WorkerOptions = {}, ) { super(name, opts); @@ -62,29 +62,31 @@ export class Worker< ); this.blockingConnection.on('error', this.emit.bind(this, 'error')); - if (typeof processor === 'function') { - this.processFn = processor; - } else { - // SANDBOXED - const supportedFileTypes = ['.js', '.ts', '.flow']; - const processorFile = - processor + - (supportedFileTypes.includes(path.extname(processor)) ? '' : '.js'); - - if (!fs.existsSync(processorFile)) { - // TODO are we forced to use sync api here? - throw new Error(`File ${processorFile} does not exist`); + if (processor) { + if (typeof processor === 'function') { + this.processFn = processor; + } else { + // SANDBOXED + const supportedFileTypes = ['.js', '.ts', '.flow']; + const processorFile = + processor + + (supportedFileTypes.includes(path.extname(processor)) ? '' : '.js'); + + if (!fs.existsSync(processorFile)) { + // TODO are we forced to use sync api here? + throw new Error(`File ${processorFile} does not exist`); + } + + this.childPool = this.childPool || new ChildPool(); + this.processFn = sandbox(processor, this.childPool).bind(this); } + this.timerManager = new TimerManager(); - this.childPool = this.childPool || new ChildPool(); - this.processFn = sandbox(processor, this.childPool).bind(this); + /* tslint:disable: no-floating-promises */ + this.run().catch(error => { + console.error(error); + }); } - this.timerManager = new TimerManager(); - - /* tslint:disable: no-floating-promises */ - this.run().catch(error => { - console.error(error); - }); this.on('error', err => console.error(err)); } @@ -412,7 +414,7 @@ export class Worker< return closePoolPromise; }) .finally(() => client.disconnect()) - .finally(() => this.timerManager.clearAllTimers()) + .finally(() => this.timerManager && this.timerManager.clearAllTimers()) .finally(() => this.emit('closed')); })(); return this.closing; diff --git a/src/test/test_worker.ts b/src/test/test_worker.ts index 960c3f0783..92a05c0131 100644 --- a/src/test/test_worker.ts +++ b/src/test/test_worker.ts @@ -1630,4 +1630,48 @@ describe('workers', function() { }); */ }); + + describe('Manually process jobs', () => { + it('should allow to complete jobs manually', async () => { + const worker = new Worker(queueName); + const token = 'my-token'; + + await queue.add('test', { foo: 'bar' }); + + const job = (await worker.getNextJob(token)) as Job; + + const isActive = await job.isActive(); + expect(isActive).to.be.equal(true); + + await job.moveToCompleted('return value', token); + + const isCompleted = await job.isCompleted(); + + expect(isCompleted).to.be.equal(true); + + await worker.close(); + }); + + it('should allow to fail jobs manually', async () => { + const worker = new Worker(queueName); + const token = 'my-token'; + + await queue.add('test', { foo: 'bar' }); + + const job = (await worker.getNextJob(token)) as Job; + + const isActive = await job.isActive(); + expect(isActive).to.be.equal(true); + + await job.moveToFailed(new Error('job failed for some reason'), token); + + const isCompleted = await job.isCompleted(); + const isFailed = await job.isFailed(); + + expect(isCompleted).to.be.equal(false); + expect(isFailed).to.be.equal(true); + + await worker.close(); + }); + }); });