From d81f0f473fddd1f76428aef20bd179e974414267 Mon Sep 17 00:00:00 2001 From: Andrew Boni Signori <61259237+andrewsignori-aot@users.noreply.github.com> Date: Mon, 25 Nov 2024 12:03:24 -0800 Subject: [PATCH] #3667 - Bull Scheduler - Ensure Delayed Job (#3995) Moving the latest changes from Bull Scheduler to release 2.1. --- .../processors/schedulers/base-scheduler.ts | 87 +++++++++++++++---- .../libs/services/src/queue/queue.service.ts | 24 ++++- sources/packages/backend/package-lock.json | 19 +++- sources/packages/backend/package.json | 4 +- 4 files changed, 113 insertions(+), 21 deletions(-) diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/base-scheduler.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/base-scheduler.ts index 863b791057..7138ff9aba 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/base-scheduler.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/base-scheduler.ts @@ -4,6 +4,8 @@ import { QueueNames } from "@sims/utilities"; import { ConfigService } from "@sims/utilities/config"; import { InjectLogger } from "@sims/utilities/logger"; import { CronRepeatOptions, Queue } from "bull"; +import { v4 as uuid } from "uuid"; +import * as cronParser from "cron-parser"; export abstract class BaseScheduler implements OnApplicationBootstrap { constructor( @@ -84,27 +86,82 @@ export abstract class BaseScheduler implements OnApplicationBootstrap { await this.schedulerQueue.obliterate({ force: true }); return; } - await this.deleteOldRepeatableJobs(); - // Add the cron to the queue. - await this.schedulerQueue.add(await this.payload()); + // Acquires a lock to avoid concurrent issues while checking the queues on Redis. + // This prevents concurrency between queue-consumers instances and also between the + // schedulers bootstrap executions (when all bootstrap methods were executed at the same time + // it caused issues with the ioredis connection). + await this.queueService.acquireGlobalQueueLock(async () => { + this.logger.log( + `Starting verification to ensure the next delayed job is created for queue ${this.schedulerQueue.name}.`, + ); + try { + this.logger.log(`Check if current job state is paused.`); + // Check if the job is paused to avoid creating new delayed jobs. + const isPaused = await this.schedulerQueue.isPaused(); + if (isPaused) { + return; + } + await this.ensureNextDelayedJobCreation(); + } catch (error: unknown) { + this.logger.error(`Error while ensuring next delayed job.`, error); + throw error; + } + }); } /** - * Check if there is any old cron job (i.e whenever there is a - * change in cron option, then a new job is created the old job - * will be still there in the queue) and delete it and add the - * new job to the queue. - * Note: If there is an old retrying job, it won't be deleted, - * as "getRepeatableJobs" will not fetch retrying jobs. + * Ensures a scheduled job will have a delayed job created with the + * next expected scheduled time based on the configured cron expression. */ - private async deleteOldRepeatableJobs(): Promise { - const getAllRepeatableJobs = await this.schedulerQueue.getRepeatableJobs(); - const cronRepeatOption = await this.queueCronConfiguration(); - getAllRepeatableJobs.forEach((job) => { - if (job.cron !== cronRepeatOption.cron) { - this.schedulerQueue.removeRepeatableByKey(job.key); + private async ensureNextDelayedJobCreation(): Promise { + this.logger.log(`Getting list of delayed jobs.`); + const delayedJobs = await this.schedulerQueue.getDelayed(); + const expectedJobMilliseconds = + await this.getNexSchedulerExecutionMilliseconds(); + // Check if there is a delayed job with the expected scheduled time. + const expectedDelayedJob = delayedJobs.find((delayedJob) => { + const delayedJobMilliseconds = + delayedJob.opts.delay + delayedJob.timestamp; + return expectedJobMilliseconds === delayedJobMilliseconds; + }); + // If the only delayed job is the expected one, no further verifications are needed. + if (expectedDelayedJob && delayedJobs.length === 1) { + this.logger.log(`Delayed job was already created as expected.`); + return; + } + // Remove any non expected delayed job. + for (const delayedJob of delayedJobs) { + if (!expectedDelayedJob || delayedJob !== expectedDelayedJob) { + this.logger.log(`Removing job ${delayedJob.id}.`); + await delayedJob.remove(); } + } + // The expected delayed job was found and any + // extra delayed jobs were removed. + if (expectedDelayedJob) { + return; + } + // Creating a unique job ID ensures that the delayed jobs + // will be created even if they were already promoted. + const uniqueJobId = `${this.schedulerQueue.name}:${uuid()}`; + this.logger.log(`Creating new delayed job using unique id ${uniqueJobId}.`); + await this.schedulerQueue.add(await this.payload(), { + jobId: uniqueJobId, + }); + this.logger.log(`New delayed job id ${uniqueJobId} was created.`); + } + + /** + * Gets the next scheduled date milliseconds for the given cron expression. + * @param cron cron expression string. + * @returns the next date the scheduler will be executed. + */ + async getNexSchedulerExecutionMilliseconds(): Promise { + const repeatOptions = await this.queueCronConfiguration(); + const result = cronParser.parseExpression(repeatOptions.cron, { + utc: true, }); + return result.next().getTime(); } @InjectLogger() diff --git a/sources/packages/backend/libs/services/src/queue/queue.service.ts b/sources/packages/backend/libs/services/src/queue/queue.service.ts index 1a860bb69a..86e3517957 100644 --- a/sources/packages/backend/libs/services/src/queue/queue.service.ts +++ b/sources/packages/backend/libs/services/src/queue/queue.service.ts @@ -3,13 +3,14 @@ import { InjectRepository } from "@nestjs/typeorm"; import { QueueConfiguration } from "@sims/sims-db"; import { QueueNames } from "@sims/utilities"; import Bull, { AdvancedSettings } from "bull"; -import { Repository } from "typeorm"; +import { DataSource, Repository } from "typeorm"; import { QueueModel } from "./model/queue.model"; @Injectable() export class QueueService { private queueConfiguration: QueueConfiguration[] = undefined; constructor( + private readonly dataSource: DataSource, @InjectRepository(QueueConfiguration) private queueConfigurationRepo: Repository, ) {} @@ -125,4 +126,25 @@ export class QueueService { const queueConfig = await this.queueConfigurationDetails(queueName); return queueConfig.queueConfiguration.amountHoursAssessmentRetry; } + + /** + * Acquires a database lock that can be used for any task that requires + * a single process to be executed exclusively at a time, even in different + * queue-consumers instances. + * @param callback method to be executed inside the lock. + */ + async acquireGlobalQueueLock(callback: () => Promise): Promise { + await this.dataSource.transaction(async (entityManager) => { + // Selects the first record to be used as a lock. + await entityManager.getRepository(QueueConfiguration).find({ + select: { + id: true, + }, + lock: { mode: "pessimistic_write" }, + order: { id: "ASC" }, + take: 1, + }); + await callback(); + }); + } } diff --git a/sources/packages/backend/package-lock.json b/sources/packages/backend/package-lock.json index 5ad28f9329..4633915762 100644 --- a/sources/packages/backend/package-lock.json +++ b/sources/packages/backend/package-lock.json @@ -34,11 +34,12 @@ "clamscan": "^2.2.1", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", + "cron-parser": "^4.9.0", "dayjs": "^1.10.4", "dotenv": "^8.2.0", "express-basic-auth": "^1.2.1", "helmet": "^7.1.0", - "ioredis": "^5.2.4", + "ioredis": "^5.4.1", "jsonpath": "^1.1.1", "jwt-decode": "^3.1.2", "papaparse": "^5.4.1", @@ -76,6 +77,7 @@ "@types/passport-jwt": "^3.0.4", "@types/pg": "^8.10.9", "@types/supertest": "^2.0.10", + "@types/uuid": "^10.0.0", "@typescript-eslint/eslint-plugin": "^5.39.0", "@typescript-eslint/parser": "^5.39.0", "copy-webpack-plugin": "^11.0.0", @@ -5030,6 +5032,13 @@ "@types/superagent": "*" } }, + "node_modules/@types/uuid": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-10.0.0.tgz", + "integrity": "sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/validator": { "version": "13.11.9", "resolved": "https://registry.npmjs.org/@types/validator/-/validator-13.11.9.tgz", @@ -6801,6 +6810,7 @@ "version": "4.9.0", "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", "integrity": "sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==", + "license": "MIT", "dependencies": { "luxon": "^3.2.1" }, @@ -8638,9 +8648,10 @@ } }, "node_modules/ioredis": { - "version": "5.3.2", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", - "integrity": "sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==", + "version": "5.4.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.4.1.tgz", + "integrity": "sha512-2YZsvl7jopIa1gaePkeMtd9rAcSjOOjPtpcLlOeusyO+XH2SK5ZcT+UCrElPP+WVIInh2TzeI4XW9ENaSLVVHA==", + "license": "MIT", "dependencies": { "@ioredis/commands": "^1.1.1", "cluster-key-slot": "^1.1.0", diff --git a/sources/packages/backend/package.json b/sources/packages/backend/package.json index 6af3282e58..92aca45f87 100644 --- a/sources/packages/backend/package.json +++ b/sources/packages/backend/package.json @@ -67,11 +67,12 @@ "clamscan": "^2.2.1", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", + "cron-parser": "^4.9.0", "dayjs": "^1.10.4", "dotenv": "^8.2.0", "express-basic-auth": "^1.2.1", "helmet": "^7.1.0", - "ioredis": "^5.2.4", + "ioredis": "^5.4.1", "jsonpath": "^1.1.1", "jwt-decode": "^3.1.2", "papaparse": "^5.4.1", @@ -109,6 +110,7 @@ "@types/passport-jwt": "^3.0.4", "@types/pg": "^8.10.9", "@types/supertest": "^2.0.10", + "@types/uuid": "^10.0.0", "@typescript-eslint/eslint-plugin": "^5.39.0", "@typescript-eslint/parser": "^5.39.0", "copy-webpack-plugin": "^11.0.0",