Skip to content

Commit

Permalink
feat: add support for manually processing jobs fixes #327
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Dec 30, 2020
1 parent ec6634f commit e42bfd2
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 22 deletions.
46 changes: 24 additions & 22 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class Worker<
private processing: Map<Promise<Job<T, R, N> | string>, string>; // { [index: number]: Promise<Job | void> } = {};
constructor(
name: string,
processor: string | Processor<T, R, N>,
processor?: string | Processor<T, R, N>,
opts: WorkerOptions = {},
) {
super(name, opts);
Expand All @@ -62,29 +62,31 @@ export class Worker<
);
this.blockingConnection.on('error', this.emit.bind(this, 'error'));

if (typeof processor === 'function') {
this.processFn = processor;
} else {
// SANDBOXED
const supportedFileTypes = ['.js', '.ts', '.flow'];
const processorFile =
processor +
(supportedFileTypes.includes(path.extname(processor)) ? '' : '.js');

if (!fs.existsSync(processorFile)) {
// TODO are we forced to use sync api here?
throw new Error(`File ${processorFile} does not exist`);
if (processor) {
if (typeof processor === 'function') {
this.processFn = processor;
} else {
// SANDBOXED
const supportedFileTypes = ['.js', '.ts', '.flow'];
const processorFile =
processor +
(supportedFileTypes.includes(path.extname(processor)) ? '' : '.js');

if (!fs.existsSync(processorFile)) {
// TODO are we forced to use sync api here?
throw new Error(`File ${processorFile} does not exist`);
}

this.childPool = this.childPool || new ChildPool();
this.processFn = sandbox<T, R, N>(processor, this.childPool).bind(this);
}
this.timerManager = new TimerManager();

this.childPool = this.childPool || new ChildPool();
this.processFn = sandbox<T, R, N>(processor, this.childPool).bind(this);
/* tslint:disable: no-floating-promises */
this.run().catch(error => {
console.error(error);
});
}
this.timerManager = new TimerManager();

/* tslint:disable: no-floating-promises */
this.run().catch(error => {
console.error(error);
});

this.on('error', err => console.error(err));
}
Expand Down Expand Up @@ -412,7 +414,7 @@ export class Worker<
return closePoolPromise;
})
.finally(() => client.disconnect())
.finally(() => this.timerManager.clearAllTimers())
.finally(() => this.timerManager && this.timerManager.clearAllTimers())
.finally(() => this.emit('closed'));
})();
return this.closing;
Expand Down
44 changes: 44 additions & 0 deletions src/test/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1630,4 +1630,48 @@ describe('workers', function() {
});
*/
});

describe('Manually process jobs', () => {
it('should allow to complete jobs manually', async () => {
const worker = new Worker(queueName);
const token = 'my-token';

await queue.add('test', { foo: 'bar' });

const job = (await worker.getNextJob(token)) as Job;

const isActive = await job.isActive();
expect(isActive).to.be.equal(true);

await job.moveToCompleted('return value', token);

const isCompleted = await job.isCompleted();

expect(isCompleted).to.be.equal(true);

await worker.close();
});

it('should allow to fail jobs manually', async () => {
const worker = new Worker(queueName);
const token = 'my-token';

await queue.add('test', { foo: 'bar' });

const job = (await worker.getNextJob(token)) as Job;

const isActive = await job.isActive();
expect(isActive).to.be.equal(true);

await job.moveToFailed(new Error('job failed for some reason'), token);

const isCompleted = await job.isCompleted();
const isFailed = await job.isFailed();

expect(isCompleted).to.be.equal(false);
expect(isFailed).to.be.equal(true);

await worker.close();
});
});
});

0 comments on commit e42bfd2

Please sign in to comment.