diff --git a/lib/commands/addJob-6.lua b/lib/commands/addJob-6.lua index 2e82d004d..9b434bfa2 100644 --- a/lib/commands/addJob-6.lua +++ b/lib/commands/addJob-6.lua @@ -33,6 +33,9 @@ ARGV[9] priority ARGV[10] LIFO ARGV[11] token + ARGV[12] debounce key + ARGV[13] debounceId + ARGV[14] debounceTtl ]] local jobId local jobIdKey @@ -40,6 +43,7 @@ local rcall = redis.call -- Includes --- @include "includes/addJobWithPriority" +--- @include "includes/debounceJob" --- @include "includes/getTargetQueueList" local jobCounter = rcall("INCR", KEYS[4]) @@ -56,10 +60,28 @@ else end end +local debounceKey = ARGV[12] + local opts = cmsgpack.unpack(ARGV[5]) --- Store the job. -rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9]) +local debouncedJobId = debounceJob(ARGV[1], ARGV[13], ARGV[14], + jobId, debounceKey, ARGV[11]) +if debouncedJobId then + return debouncedJobId +end + +local debounceId = ARGV[13] + +local optionalValues = {} + +if debounceId ~= "" then + table.insert(optionalValues, "deid") + table.insert(optionalValues, debounceId) +end + + -- Store the job. +rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", + ARGV[6], "delay", ARGV[7], "priority", ARGV[9], unpack(optionalValues)) -- Check if job is delayed local delayedTimestamp = tonumber(ARGV[8]) diff --git a/lib/commands/cleanJobsInSet-3.lua b/lib/commands/cleanJobsInSet-3.lua index 4daabc1ea..bad7de879 100644 --- a/lib/commands/cleanJobsInSet-3.lua +++ b/lib/commands/cleanJobsInSet-3.lua @@ -6,7 +6,7 @@ KEYS[2] priority key KEYS[3] rate limiter key - ARGV[1] jobId + ARGV[1] prefix key ARGV[2] maxTimestamp ARGV[3] limit the number of jobs to be removed. 0 is unlimited ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed' @@ -16,7 +16,7 @@ local setKey = KEYS[1] local priorityKey = KEYS[2] local rateLimiterKey = KEYS[3] -local jobKeyPrefix = ARGV[1] +local prefixKey = ARGV[1] local maxTimestamp = ARGV[2] local limitStr = ARGV[3] local setName = ARGV[4] @@ -24,6 +24,9 @@ local setName = ARGV[4] local isList = false local rcall = redis.call +-- Includes +--- @include "includes/removeDebounceKey" + if setName == "wait" or setName == "active" or setName == "paused" then isList = true end @@ -75,7 +78,7 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do break end - local jobKey = jobKeyPrefix .. jobId + local jobKey = prefixKey .. jobId if (rcall("EXISTS", jobKey .. ":lock") == 0) then -- Find the right timestamp of the job to compare to maxTimestamp: -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed @@ -98,6 +101,11 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do rcall("ZREM", setKey, jobId) end rcall("ZREM", priorityKey, jobId) + + if setName ~= "completed" and setName ~= "failed" then + removeDebounceKey(prefixKey, jobKey) + end + rcall("DEL", jobKey) rcall("DEL", jobKey .. ":logs") diff --git a/lib/commands/includes/debounceJob.lua b/lib/commands/includes/debounceJob.lua new file mode 100644 index 000000000..b545b0290 --- /dev/null +++ b/lib/commands/includes/debounceJob.lua @@ -0,0 +1,20 @@ +--[[ + Function to debounce a job. +]] + +local function debounceJob(prefixKey, debounceId, ttl, jobId, debounceKey, token) + if debounceId ~= "" then + local debounceKeyExists + if ttl ~= "" then + debounceKeyExists = not rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX') + else + debounceKeyExists = not rcall('SET', debounceKey, jobId, 'NX') + end + if debounceKeyExists then + local currentDebounceJobId = rcall('GET', debounceKey) + rcall("PUBLISH", prefixKey .. "debounced@" .. token, currentDebounceJobId) + + return currentDebounceJobId + end + end +end \ No newline at end of file diff --git a/lib/commands/includes/removeDebounceKey.lua b/lib/commands/includes/removeDebounceKey.lua new file mode 100644 index 000000000..13922f84a --- /dev/null +++ b/lib/commands/includes/removeDebounceKey.lua @@ -0,0 +1,12 @@ + +--[[ + Function to remove debounce key. +]] + +local function removeDebounceKey(prefixKey, jobKey) + local debounceId = rcall("HGET", jobKey, "deid") + if debounceId then + local debounceKey = prefixKey .. "de:" .. debounceId + rcall("DEL", debounceKey) + end +end diff --git a/lib/commands/includes/removeDebounceKeyIfNeeded.lua b/lib/commands/includes/removeDebounceKeyIfNeeded.lua new file mode 100644 index 000000000..fbc058e20 --- /dev/null +++ b/lib/commands/includes/removeDebounceKeyIfNeeded.lua @@ -0,0 +1,14 @@ +--[[ + Function to remove debounce key if needed. +]] + +local function removeDebounceKeyIfNeeded(prefixKey, debounceId) + if debounceId then + local debounceKey = prefixKey .. "de:" .. debounceId + local pttl = rcall("PTTL", debounceKey) + + if pttl == 0 or pttl == -1 then + rcall("DEL", debounceKey) + end + end +end diff --git a/lib/commands/moveStalledJobsToWait-7.lua b/lib/commands/moveStalledJobsToWait-7.lua index 651cb105d..737795eb2 100644 --- a/lib/commands/moveStalledJobsToWait-7.lua +++ b/lib/commands/moveStalledJobsToWait-7.lua @@ -25,6 +25,7 @@ local rcall = redis.call -- Includes --- @include "includes/batches" --- @include "includes/getTargetQueueList" +--- @include "includes/removeDebounceKeyIfNeeded" local function removeJob(jobId, baseKey) local jobKey = baseKey .. jobId @@ -78,12 +79,13 @@ if(#stalling > 0) then -- If this job has been stalled too many times, such as if it crashes the worker, then fail it. local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1) if(stalledCount > MAX_STALLED_JOB_COUNT) then - local rawOpts = rcall("HGET", jobKey, "opts") - local opts = cjson.decode(rawOpts) + local jobAttributes = rcall("HMGET", jobKey, "opts", "deid") + local opts = cjson.decode(jobAttributes[1]) local removeOnFailType = type(opts["removeOnFail"]) rcall("ZADD", KEYS[4], ARGV[3], jobId) rcall("HMSET", jobKey, "failedReason", "job stalled more than allowable limit", "finishedOn", ARGV[3]) + removeDebounceKeyIfNeeded(ARGV[2], jobAttributes[2]) rcall("PUBLISH", KEYS[4], '{"jobId":"' .. jobId .. '", "val": "job stalled more than maxStalledCount"}') if removeOnFailType == "number" then diff --git a/lib/commands/moveToFinished-9.lua b/lib/commands/moveToFinished-9.lua index 62ca15da9..3c1e2f55d 100644 --- a/lib/commands/moveToFinished-9.lua +++ b/lib/commands/moveToFinished-9.lua @@ -84,6 +84,7 @@ end -- Includes --- @include "includes/removeLock" +--- @include "includes/removeDebounceKeyIfNeeded" if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists local errorCode = removeLock(KEYS[3], KEYS[8], ARGV[5], ARGV[1]) @@ -96,6 +97,9 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists if numRemovedElements < 1 then return -3 end + local debounceId = rcall("HGET", KEYS[3], "deid") + removeDebounceKeyIfNeeded(ARGV[9], debounceId) + -- Remove job? local keepJobs = cmsgpack.unpack(ARGV[6]) local maxCount = keepJobs['count'] diff --git a/lib/commands/obliterate-2.lua b/lib/commands/obliterate-2.lua index 03c702339..76240c24a 100644 --- a/lib/commands/obliterate-2.lua +++ b/lib/commands/obliterate-2.lua @@ -18,6 +18,10 @@ local maxCount = tonumber(ARGV[1]) local baseKey = KEYS[2] local rcall = redis.call + +-- Includes +--- @include "includes/removeDebounceKey" + local function getListItems(keyName, max) return rcall('LRANGE', keyName, 0, max - 1) end @@ -26,23 +30,24 @@ local function getZSetItems(keyName, max) return rcall('ZRANGE', keyName, 0, max - 1) end -local function removeJobs(parentKey, keys) +local function removeJobs(baseKey, keys) for i, key in ipairs(keys) do - rcall("DEL", baseKey .. key) - rcall("DEL", baseKey .. key .. ':logs') + local jobKey = baseKey .. key + rcall("DEL", jobKey, jobKey .. ':logs') + removeDebounceKey(baseKey, jobKey) end maxCount = maxCount - #keys end local function removeListJobs(keyName, max) local jobs = getListItems(keyName, max) - removeJobs(keyName, jobs) + removeJobs(baseKey, jobs) rcall("LTRIM", keyName, #jobs, -1) end local function removeZSetJobs(keyName, max) local jobs = getZSetItems(keyName, max) - removeJobs(keyName, jobs) + removeJobs(baseKey, jobs) if (#jobs > 0) then rcall("ZREM", keyName, unpack(jobs)) end end @@ -65,7 +70,7 @@ if (#activeJobs > 0) then end removeLockKeys(activeJobs) -removeJobs(activeKey, activeJobs) +removeJobs(baseKey, activeJobs) rcall("LTRIM", activeKey, #activeJobs, -1) if (maxCount <= 0) then return 1 end diff --git a/lib/commands/removeJob-10.lua b/lib/commands/removeJob-11.lua similarity index 72% rename from lib/commands/removeJob-10.lua rename to lib/commands/removeJob-11.lua index a3bc016e0..e3e8031a3 100644 --- a/lib/commands/removeJob-10.lua +++ b/lib/commands/removeJob-11.lua @@ -3,16 +3,17 @@ In order to be able to remove a job, it must be unlocked. Input: - KEYS[1] 'active', - KEYS[2] 'wait', - KEYS[3] 'delayed', - KEYS[4] 'paused', - KEYS[5] 'completed', - KEYS[6] 'failed', - KEYS[7] 'priority', - KEYS[8] jobId - KEYS[9] job logs + KEYS[1] 'active', + KEYS[2] 'wait', + KEYS[3] 'delayed', + KEYS[4] 'paused', + KEYS[5] 'completed', + KEYS[6] 'failed', + KEYS[7] 'priority', + KEYS[8] jobId key + KEYS[9] job logs KEYS[10] rate limiter index table + KEYS[11] prefix key ARGV[1] jobId ARGV[2] lock token @@ -24,8 +25,12 @@ -- TODO PUBLISH global event 'removed' local rcall = redis.call + +-- Includes +--- @include "includes/removeDebounceKey" + local lockKey = KEYS[8] .. ':lock' -local lock = redis.call("GET", lockKey) +local lock = rcall("GET", lockKey) if not lock then -- or (lock == ARGV[2])) then local jobId = ARGV[1] rcall("LREM", KEYS[1], 0, jobId) @@ -35,6 +40,9 @@ if not lock then -- or (lock == ARGV[2])) then rcall("ZREM", KEYS[5], jobId) rcall("ZREM", KEYS[6], jobId) rcall("ZREM", KEYS[7], jobId) + + removeDebounceKey(KEYS[11], KEYS[8]) + rcall("DEL", KEYS[8]) rcall("DEL", KEYS[9]) diff --git a/lib/job.js b/lib/job.js index d442287eb..1642d3fb3 100644 --- a/lib/job.js +++ b/lib/job.js @@ -57,6 +57,7 @@ const Job = function(queue, name, data, opts) { this.attemptsMade = 0; this.toKey = _.bind(queue.toKey, queue); + this.debounceId = this.opts.debounce ? this.opts.debounce.id : undefined; }; function setDefaultOpts(opts) { @@ -82,7 +83,8 @@ function addJob(queue, client, job) { return scripts.addJob(client, queue, jobData, { lifo: opts.lifo, customJobId: opts.jobId, - priority: opts.priority + priority: opts.priority, + debounce: opts.debounce }); } @@ -182,6 +184,7 @@ Job.prototype.toJSON = function() { failedReason: this.failedReason, stacktrace: this.stacktrace || null, returnvalue: this.returnvalue || null, + debounceId: this.debounceId || null, finishedOn: this.finishedOn || null, processedOn: this.processedOn || null }; @@ -641,6 +644,10 @@ Job.fromJSON = function(queue, json, jobId) { job.returnvalue = getReturnValue(json.returnvalue); } + if (json.deid) { + job.debounceId = json.deid; + } + return job; }; diff --git a/lib/queue.js b/lib/queue.js index c04646d26..19eefc35d 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -276,7 +276,8 @@ const Queue = function Queue(name, url, opts) { 'limiter', 'drained', 'duplicated', - 'progress' + 'progress', + 'de' // debounce key ], key => { keys[key] = this.toKey(key); @@ -418,6 +419,7 @@ Queue.prototype._setupQueueEventListeners = function() { const failedKey = this.keys.failed; const drainedKey = this.keys.drained; const duplicatedKey = this.keys.duplicated; + const debouncedKey = this.keys.de + 'bounced'; const pmessageHandler = (pattern, channel, message) => { const keyAndToken = channel.split('@'); @@ -445,6 +447,12 @@ Queue.prototype._setupQueueEventListeners = function() { } utils.emitSafe(this, 'global:duplicated', message); break; + case debouncedKey: + if (this.token === token) { + utils.emitSafe(this, 'debounced', message); + } + utils.emitSafe(this, 'global:debounced', message); + break; } }; @@ -513,7 +521,7 @@ Queue.prototype._setupQueueEventListeners = function() { }; Queue.prototype._registerEvent = function(eventName) { - const internalEvents = ['waiting', 'delayed', 'duplicated']; + const internalEvents = ['waiting', 'delayed', 'duplicated', 'debounced']; if ( eventName.startsWith('global:') || @@ -531,7 +539,7 @@ Queue.prototype._registerEvent = function(eventName) { .isRedisReady(this.eclient) .then(() => { const channel = this.toKey(_eventName); - if (['active', 'waiting', 'stalled', 'duplicated'].indexOf(_eventName) !== -1) { + if (['active', 'waiting', 'stalled', 'duplicated', 'debounced'].indexOf(_eventName) !== -1) { return (this.registeredEvents[_eventName] = this.eclient.psubscribe( channel + '*' )); @@ -782,6 +790,15 @@ Queue.prototype.retryJobs = async function(opts = {}) { } while (cursor); }; + /** + * Removes a debounce key. + * + * @param id - identifier + */ + Queue.prototype.removeDebounceKey = (id) => { + return this.client.del(`${this.keys.de}:${id}`); + } + /** Adds an array of jobs to the queue. @method add diff --git a/lib/scripts.js b/lib/scripts.js index 17a7ee26a..0d9a0756e 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -43,7 +43,10 @@ const scripts = { job.delay ? job.timestamp + job.delay : 0, opts.priority || 0, opts.lifo ? 'RPUSH' : 'LPUSH', - queue.token + queue.token, + job.debounceId ? `${queueKeys.de}:${job.debounceId}` : null, + opts.debounce ? opts.debounce.id : null, + opts.debounce ? opts.debounce.ttl : null, ]; keys = keys.concat(args); return client.addJob(keys); @@ -401,7 +404,8 @@ const scripts = { queue.keys.priority, queue.toKey(jobId), queue.toKey(`${jobId}:logs`), - queue.keys.limiter + queue.keys.limiter, + queue.toKey(''), ]; return queue.client.removeJob(keys.concat([jobId, queue.token])); }, diff --git a/test/test_queue.js b/test/test_queue.js index 9c4d42d10..c57668a99 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -1137,6 +1137,146 @@ describe('Queue', () => { }); }); + describe('when job is debounced when added again with same debounce id', () => { + describe('when ttl is provided', () => { + it('used a fixed time period and emits debounced event', async () => { + const job = await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + + let debouncedCounter = 0; + let secondJob = null; + queue.on('debounced', (jobId) => { + if (debouncedCounter > 1) { + expect(jobId).to.be.equal(secondJob.id); + } else { + expect(jobId).to.be.equal(job.id); + } + debouncedCounter++; + }); + + await delay(1000); + await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(1100); + secondJob = await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(100); + + expect(debouncedCounter).to.be.equal(4); + }); + }); + + describe('when removing debounced job', () => { + it('removes debounce key', async ()=> { + const job = await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + + let debouncedCounter = 0; + queue.on('debounced', () => { + debouncedCounter++; + }); + await job.remove(); + + await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(1000); + await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(1100); + const secondJob = await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await secondJob.remove(); + + await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(100); + + expect(debouncedCounter).to.be.equal(2); + }); + }); + + describe('when ttl is not provided', ()=> { + it('waits until job is finished before removing debounce key', async ()=> { + queue.process( + async () => { + await delay(100); + await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1' } }, + ); + await delay(100); + await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1' } }, + ); + await delay(100); + } + ); + + let debouncedCounter = 0; + + const completing = new Promise(resolve => { + queue.once('completed', ({ id }) => { + expect(id).to.be.equal('1'); + resolve(); + }); + + queue.on('debounced', () => { + debouncedCounter++; + }); + }); + + await queue.add({ foo: 'bar' }, { debounce: { id: 'a1' } }); + + await completing; + + const secondJob = await queue.add( + { foo: 'bar' }, + { debounce: { id: 'a1' } }, + ); + + const count = await queue.getJobCountByTypes(); + + expect(count).to.be.eql(2); + + expect(debouncedCounter).to.be.equal(2); + expect(secondJob.id).to.be.equal('4'); + }); + }); + }); + it('process a job that updates progress', done => { queue.process((job, jobDone) => { expect(job.data.foo).to.be.equal('bar');