diff --git a/src/classes/job.ts b/src/classes/job.ts index 7cdfc9d842..b236139552 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -710,11 +710,13 @@ export class Job< token, delay, ); - (multi).moveToDelayed(args); + this.scripts.execCommand(multi, 'moveToDelayed', args); command = 'moveToDelayed'; } else { // Retry immediately - (multi).retryJob( + this.scripts.execCommand( + multi, + 'retryJob', this.scripts.retryJobArgs(this.id, this.opts.lifo, token), ); command = 'retryJob'; @@ -732,7 +734,8 @@ export class Job< token, fetchNext, ); - (multi).moveToFinished(args); + + this.scripts.execCommand(multi, 'moveToFinished', args); finishedOn = args[this.scripts.moveToFinishedKeys.length + 1] as number; command = 'moveToFinished'; } @@ -1287,7 +1290,7 @@ export class Job< err?.message, ); - (multi).saveStacktrace(args); + this.scripts.execCommand(multi, 'saveStacktrace', args); } } diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index c068765759..9ed6b469ef 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -11,6 +11,7 @@ import { isRedisCluster, isRedisInstance, isRedisVersionLowerThan, + readPackageJson, } from '../utils'; import * as scripts from '../scripts'; @@ -195,13 +196,17 @@ export class RedisConnection extends EventEmitter { return this.initializing; } - protected loadCommands(providedScripts?: Record): void { + protected loadCommands( + version: string, + providedScripts?: Record, + ): void { const finalScripts = providedScripts || (scripts as Record); for (const property in finalScripts as Record) { // Only define the command if not already defined - if (!(this._client)[finalScripts[property].name]) { - (this._client).defineCommand(finalScripts[property].name, { + const commandName = `${finalScripts[property].name}:${version}`; + if (!(this._client)[commandName]) { + (this._client).defineCommand(commandName, { numberOfKeys: finalScripts[property].keys, lua: finalScripts[property].content, }); @@ -223,9 +228,11 @@ export class RedisConnection extends EventEmitter { this._client.on('ready', this.handleClientReady); + const { version } = readPackageJson(); + await RedisConnection.waitUntilReady(this._client); - this.loadCommands(); + this.loadCommands(version); if (this._client['status'] !== 'end') { this.version = await this.getRedisVersion(); diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index cd0dcdfdaa..cafcf97074 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -34,17 +34,27 @@ import { RedisJobOptions, } from '../types'; import { ErrorCode } from '../enums'; -import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils'; +import { + array2obj, + getParentKey, + isRedisVersionLowerThan, + readPackageJson, +} from '../utils'; import { ChainableCommander } from 'ioredis'; +import { read } from 'fs'; export type JobData = [JobJsonRaw | number, string?]; export class Scripts { + protected version; + moveToFinishedKeys: (string | undefined)[]; constructor(protected queue: MinimalQueue) { const queueKeys = this.queue.keys; + this.version = readPackageJson().version; + this.moveToFinishedKeys = [ queueKeys.wait, queueKeys.active, @@ -63,13 +73,22 @@ export class Scripts { ]; } + public execCommand( + client: RedisClient | ChainableCommander, + commandName: string, + args: any[], + ) { + const commandNameWithVersion = `${commandName}:${this.version}`; + return (client)[commandNameWithVersion](args); + } + async isJobInList(listKey: string, jobId: string): Promise { const client = await this.queue.client; let result; if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6')) { - result = await (client).isJobInList([listKey, jobId]); + result = this.execCommand(client, 'isJobInList', [listKey, jobId]); } else { - result = await (client).lpos(listKey, jobId); + result = await client.lpos(listKey, jobId); } return Number.isInteger(result); } @@ -91,8 +110,7 @@ export class Scripts { ]; keys.push(pack(args), job.data, encodedOpts); - - return (client).addDelayedJob(keys); + return this.execCommand(client, 'addDelayedJob', keys); } protected addPrioritizedJob( @@ -114,8 +132,7 @@ export class Scripts { ]; keys.push(pack(args), job.data, encodedOpts); - - return (client).addPrioritizedJob(keys); + return this.execCommand(client, 'addPrioritizedJob', keys); } protected addParentJob( @@ -133,8 +150,7 @@ export class Scripts { ]; keys.push(pack(args), job.data, encodedOpts); - - return (client).addParentJob(keys); + return this.execCommand(client, 'addParentJob', keys); } protected addStandardJob( @@ -156,8 +172,7 @@ export class Scripts { ]; keys.push(pack(args), job.data, encodedOpts); - - return (client).addStandardJob(keys); + return this.execCommand(client, 'addStandardJob', keys); } async addJob( @@ -257,8 +272,7 @@ export class Scripts { const client = await this.queue.client; const args = this.pauseArgs(pause); - - return (client).pause(args); + return this.execCommand(client, 'pause', args); } protected addRepeatableJobArgs( @@ -298,8 +312,7 @@ export class Scripts { opts, legacyCustomKey, ); - - return (client).addRepeatableJob(args); + return this.execCommand(client, 'addRepeatableJob', args); } async addJobScheduler( @@ -315,8 +328,7 @@ export class Scripts { queueKeys.delayed, ]; const args = [nextMillis, pack(opts), jobSchedulerId, queueKeys['']]; - - return (client).addJobScheduler(keys.concat(args)); + return this.execCommand(client, 'addJobScheduler', keys.concat(args)); } async updateRepeatableJobMillis( @@ -331,7 +343,7 @@ export class Scripts { customKey, legacyCustomKey, ]; - return (client).updateRepeatableJobMillis(args); + return this.execCommand(client, 'updateRepeatableJobMillis', args); } async updateJobSchedulerNextMillis( @@ -382,8 +394,7 @@ export class Scripts { repeatConcatOptions, repeatJobKey, ); - - return (client).removeRepeatable(args); + return this.execCommand(client, 'removeRepeatable', args); } async removeJobScheduler(jobSchedulerId: string): Promise { @@ -395,7 +406,7 @@ export class Scripts { const args = [jobSchedulerId, queueKeys['']]; - return (client).removeJobScheduler(keys.concat(args)); + return this.execCommand(client, 'removeJobScheduler', keys.concat(args)); } protected removeArgs( @@ -415,8 +426,7 @@ export class Scripts { const client = await this.queue.client; const args = this.removeArgs(jobId, removeChildren); - - const result = await (client).removeJob(args); + const result = await this.execCommand(client, 'removeJob', args); if (result < 0) { throw this.finishedErrors({ @@ -443,7 +453,7 @@ export class Scripts { duration, jobId, ]; - return (client).extendLock(args); + return this.execCommand(client, 'extendLock', args); } async updateData( @@ -455,7 +465,11 @@ export class Scripts { const keys = [this.queue.toKey(job.id)]; const dataJson = JSON.stringify(data); - const result = await (client).updateData(keys.concat([dataJson])); + const result = await this.execCommand( + client, + 'updateData', + keys.concat([dataJson]), + ); if (result < 0) { throw this.finishedErrors({ @@ -479,7 +493,9 @@ export class Scripts { ]; const progressJson = JSON.stringify(progress); - const result = await (client).updateProgress( + const result = await this.execCommand( + client, + 'updateProgress', keys.concat([jobId, progressJson]), ); @@ -504,7 +520,9 @@ export class Scripts { this.queue.toKey(jobId) + ':logs', ]; - const result = await (client).addLog( + const result = await this.execCommand( + client, + 'addLog', keys.concat([jobId, logRow, keepLogs ? keepLogs : '']), ); @@ -591,7 +609,7 @@ export class Scripts { ) { const client = await this.queue.client; - const result = await (client).moveToFinished(args); + const result = await this.execCommand(client, 'moveToFinished', args); if (result < 0) { throw this.finishedErrors({ code: result, @@ -626,7 +644,7 @@ export class Scripts { const client = await this.queue.client; const args = this.drainArgs(delayed); - return (client).drain(args); + return this.execCommand(client, 'drain', args); } private removeChildDependencyArgs( @@ -649,7 +667,11 @@ export class Scripts { const client = await this.queue.client; const args = this.removeChildDependencyArgs(jobId, parentKey); - const result = await (client).removeChildDependency(args); + const result = await this.execCommand( + client, + 'removeChildDependency', + args, + ); switch (result) { case 0: @@ -693,7 +715,7 @@ export class Scripts { const client = await this.queue.client; const args = this.getRangesArgs(types, start, end, asc); - return (client).getRanges(args); + return await this.execCommand(client, 'getRanges', args); } private getCountsArgs(types: JobType[]): (string | number)[] { @@ -713,7 +735,7 @@ export class Scripts { const client = await this.queue.client; const args = this.getCountsArgs(types); - return (client).getCounts(args); + return await this.execCommand(client, 'getCounts', args); } protected getCountsPerPriorityArgs( @@ -735,7 +757,7 @@ export class Scripts { const client = await this.queue.client; const args = this.getCountsPerPriorityArgs(priorities); - return (client).getCountsPerPriority(args); + return await this.execCommand(client, 'getCountsPerPriority', args); } moveToCompletedArgs( @@ -788,7 +810,9 @@ export class Scripts { return this.queue.toKey(key); }); - return (client).isFinished( + return this.execCommand( + client, + 'isFinished', keys.concat([jobId, returnValue ? '1' : '']), ); } @@ -810,16 +834,16 @@ export class Scripts { }); if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6')) { - return (client).getState(keys.concat([jobId])); + return this.execCommand(client, 'getState', keys.concat([jobId])); } - return (client).getStateV2(keys.concat([jobId])); + return this.execCommand(client, 'getStateV2', keys.concat([jobId])); } async changeDelay(jobId: string, delay: number): Promise { const client = await this.queue.client; const args = this.changeDelayArgs(jobId, delay); - const result = await (client).changeDelay(args); + const result = await this.execCommand(client, 'changeDelay', args); if (result < 0) { throw this.finishedErrors({ code: result, @@ -856,7 +880,8 @@ export class Scripts { const client = await this.queue.client; const args = this.changePriorityArgs(jobId, priority, lifo); - const result = await (client).changePriority(args); + + const result = await this.execCommand(client, 'changePriority', args); if (result < 0) { throw this.finishedErrors({ code: result, @@ -961,7 +986,7 @@ export class Scripts { const client = await this.queue.client; const args = this.isMaxedArgs(); - return !!(await (client).isMaxed(args)); + return !!(await this.execCommand(client, 'isMaxed', args)); } async moveToDelayed( @@ -974,7 +999,8 @@ export class Scripts { const client = await this.queue.client; const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts); - const result = await (client).moveToDelayed(args); + + const result = await this.execCommand(client, 'moveToDelayed', args); if (result < 0) { throw this.finishedErrors({ code: result, @@ -1004,7 +1030,11 @@ export class Scripts { const client = await this.queue.client; const args = this.moveToWaitingChildrenArgs(jobId, token, opts); - const result = await (client).moveToWaitingChildren(args); + const result = await this.execCommand( + client, + 'moveToWaitingChildren', + args, + ); switch (result) { case 0: @@ -1031,7 +1061,7 @@ export class Scripts { const client = await this.queue.client; const args = this.getRateLimitTtlArgs(maxJobs); - return (client).getRateLimitTtl(args); + return this.execCommand(client, 'getRateLimitTtl', args); } /** @@ -1046,7 +1076,7 @@ export class Scripts { ): Promise { const client = await this.queue.client; - return (client).cleanJobsInSet([ + return this.execCommand(client, 'cleanJobsInSet', [ this.queue.toKey(set), this.queue.toKey('events'), this.queue.toKey('repeat'), @@ -1117,7 +1147,7 @@ export class Scripts { const args = this.moveJobsToWaitArgs(state, count, timestamp); - return (client).moveJobsToWait(args); + return this.execCommand(client, 'moveJobsToWait', args); } async promoteJobs(count = 1000): Promise { @@ -1125,7 +1155,7 @@ export class Scripts { const args = this.moveJobsToWaitArgs('delayed', count, Number.MAX_VALUE); - return (client).moveJobsToWait(args); + return this.execCommand(client, 'moveJobsToWait', args); } /** @@ -1165,7 +1195,11 @@ export class Scripts { state, ]; - const result = await (client).reprocessJob(keys.concat(args)); + const result = await this.execCommand( + client, + 'reprocessJob', + keys.concat(args), + ); switch (result) { case 1: @@ -1209,7 +1243,9 @@ export class Scripts { }), ]; - const result = await (client).moveToActive( + const result = await this.execCommand( + client, + 'moveToActive', (<(string | number | boolean | Buffer)[]>keys).concat(args), ); @@ -1233,7 +1269,7 @@ export class Scripts { const args = [this.queue.toKey(''), jobId]; - const code = await (client).promote(keys.concat(args)); + const code = await this.execCommand(client, 'promote', keys.concat(args)); if (code < 0) { throw this.finishedErrors({ code, @@ -1281,7 +1317,7 @@ export class Scripts { const args = this.moveStalledJobsToWaitArgs(); - return (client).moveStalledJobsToWait(args); + return this.execCommand(client, 'moveStalledJobsToWait', args); } /** @@ -1312,7 +1348,11 @@ export class Scripts { const args = [jobId, token, this.queue.toKey(jobId)]; - const pttl = await (client).moveJobFromActiveToWait(keys.concat(args)); + const pttl = await this.execCommand( + client, + 'moveJobFromActiveToWait', + keys.concat(args), + ); return pttl < 0 ? 0 : pttl; } @@ -1326,7 +1366,11 @@ export class Scripts { ]; const args = [opts.count, opts.force ? 'force' : null]; - const result = await (client).obliterate(keys.concat(args)); + const result = await this.execCommand( + client, + 'obliterate', + keys.concat(args), + ); if (result < 0) { switch (result) { case -1: @@ -1380,9 +1424,12 @@ export class Scripts { args.push(1); } - [cursor, offset, items, total, rawJobs] = await (client).paginate( + [cursor, offset, items, total, rawJobs] = await this.execCommand( + client, + 'paginate', keys.concat(args), ); + page = page.concat(items); if (rawJobs && rawJobs.length) {