Skip to content

Commit

Permalink
feat: expose addJobLog and updateJobProgress to the Queue instance (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Sep 29, 2023
1 parent ba77e06 commit 2056939
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 33 deletions.
61 changes: 40 additions & 21 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,41 @@ export class Job<
}
}

/**
* addJobLog
*
* @param queue Queue instance
* @param jobId Job id
* @param logRow Log row
* @param keepLogs optional maximum number of logs to keep
*
* @returns The total number of log entries for this job so far.
*/
static async addJobLog(
queue: MinimalQueue,
jobId: string,
logRow: string,
keepLogs?: number,
): Promise<number> {
const client = await queue.client;
const logsKey = queue.toKey(jobId) + ':logs';

const multi = client.multi();

multi.rpush(logsKey, logRow);

if (keepLogs) {
multi.ltrim(logsKey, -keepLogs, -1);
}

const result = (await multi.exec()) as [
[Error, number],
[Error, string] | undefined,
];

return keepLogs ? Math.min(keepLogs, result[0][1]) : result[0][1];
}

toJSON() {
const { queue, scripts, ...withoutQueueAndScripts } = this;
return withoutQueueAndScripts;
Expand Down Expand Up @@ -451,36 +486,20 @@ export class Job<
*
* @param progress - number or object to be saved as progress.
*/
updateProgress(progress: number | object): Promise<void> {
async updateProgress(progress: number | object): Promise<void> {
this.progress = progress;
return this.scripts.updateProgress(this, progress);
await this.scripts.updateProgress(this.id, progress);
this.queue.emit('progress', this, progress);
}

/**
* Logs one row of log data.
*
* @param logRow - string with log data to be logged.
* @returns The total number of log entries for this job so far.
*/
async log(logRow: string): Promise<number> {
const client = await this.queue.client;
const logsKey = this.toKey(this.id) + ':logs';

const multi = client.multi();

multi.rpush(logsKey, logRow);

if (this.opts.keepLogs) {
multi.ltrim(logsKey, -this.opts.keepLogs, -1);
}

const result = (await multi.exec()) as [
[Error, number],
[Error, string] | undefined,
];

return this.opts.keepLogs
? Math.min(this.opts.keepLogs, result[0][1])
: result[0][1];
return Job.addJobLog(this.queue, this.id, logRow, this.opts.keepLogs);
}

/**
Expand Down
30 changes: 30 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,36 @@ export class Queue<
return this.scripts.remove(jobId, removeChildren);
}

/**
* Updates the given job's progress.
*
* @param jobId - The id of the job to update
* @param progress - number or object to be saved as progress.
*/
async updateJobProgress(
jobId: string,
progress: number | object,
): Promise<void> {
return this.scripts.updateProgress(jobId, progress);
}

/**
* Logs one row of job's log data.
*
* @param jobId - The job id to log against.
* @param logRow - string with log data to be logged.
* @param keepLogs - max number of log entries to keep (0 for unlimited).
*
* @returns The total number of log entries for this job so far.
*/
async addJobLog(
jobId: string,
logRow: string,
keepLogs?: number,
): Promise<number> {
return Job.addJobLog(this, jobId, logRow, keepLogs);
}

/**
* Drains the queue, i.e., removes all jobs that are waiting
* or delayed, but not active, completed or failed.
Expand Down
10 changes: 4 additions & 6 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,23 +224,21 @@ export class Scripts {
}

async updateProgress<T = any, R = any, N extends string = string>(
job: MinimalJob<T, R, N>,
jobId: string,
progress: number | object,
): Promise<void> {
const client = await this.queue.client;

const keys = [this.queue.toKey(job.id), this.queue.keys.events];
const keys = [this.queue.toKey(jobId), this.queue.keys.events];
const progressJson = JSON.stringify(progress);

const result = await (<any>client).updateProgress(
keys.concat([job.id, progressJson]),
keys.concat([jobId, progressJson]),
);

if (result < 0) {
throw this.finishedErrors(result, job.id, 'updateProgress');
throw this.finishedErrors(result, jobId, 'updateProgress');
}

this.queue.emit('progress', job, progress);
}

protected moveToFinishedArgs<T = any, R = any, N extends string = string>(
Expand Down
42 changes: 36 additions & 6 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,31 @@ describe('Job', function () {
it('can set and get progress as number', async function () {
const job = await Job.create(queue, 'test', { foo: 'bar' });
await job.updateProgress(42);
const storedJob = await Job.fromId(queue, job.id);
const storedJob = await Job.fromId(queue, job.id!);
expect(storedJob!.progress).to.be.equal(42);
});

it('can set and get progress as object', async function () {
const job = await Job.create(queue, 'test', { foo: 'bar' });
await job.updateProgress({ total: 120, completed: 40 });
const storedJob = await Job.fromId(queue, job.id);
expect(storedJob.progress).to.eql({ total: 120, completed: 40 });
const storedJob = await Job.fromId(queue, job.id!);
expect(storedJob!.progress).to.eql({ total: 120, completed: 40 });
});

it('cat set progress as number using the Queue instance', async () => {
const job = await Job.create(queue, 'test', { foo: 'bar' });

await queue.updateJobProgress(job.id!, 42);

const storedJob = await Job.fromId(queue, job.id!);
expect(storedJob!.progress).to.be.equal(42);
});

it('cat set progress as object using the Queue instance', async () => {
const job = await Job.create(queue, 'test', { foo: 'bar' });
await queue.updateJobProgress(job.id!, { total: 120, completed: 40 });
const storedJob = await Job.fromId(queue, job.id!);
expect(storedJob!.progress).to.eql({ total: 120, completed: 40 });
});

describe('when job is removed', () => {
Expand Down Expand Up @@ -404,21 +420,35 @@ describe('Job', function () {
const count1 = await job.log(firstLog);
expect(count1).to.be.equal(1);

const logs1 = await queue.getJobLogs(job.id);
const logs1 = await queue.getJobLogs(job.id!);
expect(logs1).to.be.eql({ logs: [firstLog], count: 1 });

const count2 = await job.log(secondLog);
expect(count2).to.be.equal(2);

const logs2 = await queue.getJobLogs(job.id);
const logs2 = await queue.getJobLogs(job.id!);
expect(logs2).to.be.eql({ logs: [firstLog, secondLog], count: 2 });

const count3 = await job.log(thirdLog);
expect(count3).to.be.equal(2);

const logs3 = await queue.getJobLogs(job.id);
const logs3 = await queue.getJobLogs(job.id!);
expect(logs3).to.be.eql({ logs: [secondLog, thirdLog], count: 2 });
});

it('should allow to add job logs from Queue instance', async () => {
const firstLog = 'some log text 1';
const secondLog = 'some log text 2';

const job = await Job.create(queue, 'test', { foo: 'bar' });

await queue.addJobLog(job.id!, firstLog);
await queue.addJobLog(job.id!, secondLog);

const logs = await queue.getJobLogs(job.id!);

expect(logs).to.be.eql({ logs: [firstLog, secondLog], count: 2 });
});
});

describe('.clearLogs', () => {
Expand Down

0 comments on commit 2056939

Please sign in to comment.