Skip to content

Commit

Permalink
#3667 - Bull Scheduler - Ensure Delayed Job (#3995)
Browse files Browse the repository at this point in the history
Moving the latest changes from Bull Scheduler to release 2.1.
  • Loading branch information
andrewsignori-aot authored Nov 25, 2024
1 parent 9de4f64 commit d81f0f4
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { QueueNames } from "@sims/utilities";
import { ConfigService } from "@sims/utilities/config";
import { InjectLogger } from "@sims/utilities/logger";
import { CronRepeatOptions, Queue } from "bull";
import { v4 as uuid } from "uuid";
import * as cronParser from "cron-parser";

export abstract class BaseScheduler<T> implements OnApplicationBootstrap {
constructor(
Expand Down Expand Up @@ -84,27 +86,82 @@ export abstract class BaseScheduler<T> implements OnApplicationBootstrap {
await this.schedulerQueue.obliterate({ force: true });
return;
}
await this.deleteOldRepeatableJobs();
// Add the cron to the queue.
await this.schedulerQueue.add(await this.payload());
// Acquires a lock to avoid concurrent issues while checking the queues on Redis.
// This prevents concurrency between queue-consumers instances and also between the
// schedulers bootstrap executions (when all bootstrap methods were executed at the same time
// it caused issues with the ioredis connection).
await this.queueService.acquireGlobalQueueLock(async () => {
this.logger.log(
`Starting verification to ensure the next delayed job is created for queue ${this.schedulerQueue.name}.`,
);
try {
this.logger.log(`Check if current job state is paused.`);
// Check if the job is paused to avoid creating new delayed jobs.
const isPaused = await this.schedulerQueue.isPaused();
if (isPaused) {
return;
}
await this.ensureNextDelayedJobCreation();
} catch (error: unknown) {
this.logger.error(`Error while ensuring next delayed job.`, error);
throw error;
}
});
}

/**
* Check if there is any old cron job (i.e whenever there is a
* change in cron option, then a new job is created the old job
* will be still there in the queue) and delete it and add the
* new job to the queue.
* Note: If there is an old retrying job, it won't be deleted,
* as "getRepeatableJobs" will not fetch retrying jobs.
* Ensures a scheduled job will have a delayed job created with the
* next expected scheduled time based on the configured cron expression.
*/
private async deleteOldRepeatableJobs(): Promise<void> {
const getAllRepeatableJobs = await this.schedulerQueue.getRepeatableJobs();
const cronRepeatOption = await this.queueCronConfiguration();
getAllRepeatableJobs.forEach((job) => {
if (job.cron !== cronRepeatOption.cron) {
this.schedulerQueue.removeRepeatableByKey(job.key);
private async ensureNextDelayedJobCreation(): Promise<void> {
this.logger.log(`Getting list of delayed jobs.`);
const delayedJobs = await this.schedulerQueue.getDelayed();
const expectedJobMilliseconds =
await this.getNexSchedulerExecutionMilliseconds();
// Check if there is a delayed job with the expected scheduled time.
const expectedDelayedJob = delayedJobs.find((delayedJob) => {
const delayedJobMilliseconds =
delayedJob.opts.delay + delayedJob.timestamp;
return expectedJobMilliseconds === delayedJobMilliseconds;
});
// If the only delayed job is the expected one, no further verifications are needed.
if (expectedDelayedJob && delayedJobs.length === 1) {
this.logger.log(`Delayed job was already created as expected.`);
return;
}
// Remove any non expected delayed job.
for (const delayedJob of delayedJobs) {
if (!expectedDelayedJob || delayedJob !== expectedDelayedJob) {
this.logger.log(`Removing job ${delayedJob.id}.`);
await delayedJob.remove();
}
}
// The expected delayed job was found and any
// extra delayed jobs were removed.
if (expectedDelayedJob) {
return;
}
// Creating a unique job ID ensures that the delayed jobs
// will be created even if they were already promoted.
const uniqueJobId = `${this.schedulerQueue.name}:${uuid()}`;
this.logger.log(`Creating new delayed job using unique id ${uniqueJobId}.`);
await this.schedulerQueue.add(await this.payload(), {
jobId: uniqueJobId,
});
this.logger.log(`New delayed job id ${uniqueJobId} was created.`);
}

/**
* Gets the next scheduled date milliseconds for the given cron expression.
* @param cron cron expression string.
* @returns the next date the scheduler will be executed.
*/
async getNexSchedulerExecutionMilliseconds(): Promise<number> {
const repeatOptions = await this.queueCronConfiguration();
const result = cronParser.parseExpression(repeatOptions.cron, {
utc: true,
});
return result.next().getTime();
}

@InjectLogger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ import { InjectRepository } from "@nestjs/typeorm";
import { QueueConfiguration } from "@sims/sims-db";
import { QueueNames } from "@sims/utilities";
import Bull, { AdvancedSettings } from "bull";
import { Repository } from "typeorm";
import { DataSource, Repository } from "typeorm";
import { QueueModel } from "./model/queue.model";

@Injectable()
export class QueueService {
private queueConfiguration: QueueConfiguration[] = undefined;
constructor(
private readonly dataSource: DataSource,
@InjectRepository(QueueConfiguration)
private queueConfigurationRepo: Repository<QueueConfiguration>,
) {}
Expand Down Expand Up @@ -125,4 +126,25 @@ export class QueueService {
const queueConfig = await this.queueConfigurationDetails(queueName);
return queueConfig.queueConfiguration.amountHoursAssessmentRetry;
}

/**
* Acquires a database lock that can be used for any task that requires
* a single process to be executed exclusively at a time, even in different
* queue-consumers instances.
* @param callback method to be executed inside the lock.
*/
async acquireGlobalQueueLock(callback: () => Promise<void>): Promise<void> {
await this.dataSource.transaction(async (entityManager) => {
// Selects the first record to be used as a lock.
await entityManager.getRepository(QueueConfiguration).find({
select: {
id: true,
},
lock: { mode: "pessimistic_write" },
order: { id: "ASC" },
take: 1,
});
await callback();
});
}
}
19 changes: 15 additions & 4 deletions sources/packages/backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion sources/packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@
"clamscan": "^2.2.1",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"cron-parser": "^4.9.0",
"dayjs": "^1.10.4",
"dotenv": "^8.2.0",
"express-basic-auth": "^1.2.1",
"helmet": "^7.1.0",
"ioredis": "^5.2.4",
"ioredis": "^5.4.1",
"jsonpath": "^1.1.1",
"jwt-decode": "^3.1.2",
"papaparse": "^5.4.1",
Expand Down Expand Up @@ -109,6 +110,7 @@
"@types/passport-jwt": "^3.0.4",
"@types/pg": "^8.10.9",
"@types/supertest": "^2.0.10",
"@types/uuid": "^10.0.0",
"@typescript-eslint/eslint-plugin": "^5.39.0",
"@typescript-eslint/parser": "^5.39.0",
"copy-webpack-plugin": "^11.0.0",
Expand Down

0 comments on commit d81f0f4

Please sign in to comment.