diff --git a/lib/commands/addJob-6.lua b/lib/commands/addJob-6.lua index dbb28111e..2e82d004d 100644 --- a/lib/commands/addJob-6.lua +++ b/lib/commands/addJob-6.lua @@ -51,7 +51,7 @@ else jobId = ARGV[2] jobIdKey = ARGV[1] .. jobId if rcall("EXISTS", jobIdKey) == 1 then - rcall("PUBLISH", ARGV[1] .. "duplicated", jobId) + rcall("PUBLISH", ARGV[1] .. "duplicated@" .. ARGV[11], jobId) return jobId .. "" -- convert to string end end diff --git a/lib/queue.js b/lib/queue.js index 216918e88..c04646d26 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -440,6 +440,9 @@ Queue.prototype._setupQueueEventListeners = function() { utils.emitSafe(this, 'global:stalled', message); break; case duplicatedKey: + if (this.token === token) { + utils.emitSafe(this, 'duplicated', message); + } utils.emitSafe(this, 'global:duplicated', message); break; } @@ -510,7 +513,7 @@ Queue.prototype._setupQueueEventListeners = function() { }; Queue.prototype._registerEvent = function(eventName) { - const internalEvents = ['waiting', 'delayed']; + const internalEvents = ['waiting', 'delayed', 'duplicated']; if ( eventName.startsWith('global:') || diff --git a/test/test_queue.js b/test/test_queue.js index 647a8797f..9c4d42d10 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -1098,7 +1098,7 @@ describe('Queue', () => { }); describe('when job has been added again', () => { - it('emits duplicated event', async () => { + it('emits global duplicated event', async () => { queue.process( async () => { await delay(50); @@ -1116,6 +1116,25 @@ describe('Queue', () => { }); }); }); + + it('emits duplicated event', async () => { + queue.process( + async () => { + await delay(50); + await queue.add({ foo: 'bar' }, { jobId: 'a1' }); + await delay(50); + } + ); + + await queue.add({ foo: 'bar' }, { jobId: 'a1' }); + + await new Promise(resolve => { + queue.once('duplicated', (jobId) => { + expect(jobId).to.be.equal('a1'); + resolve(); + }); + }); + }); }); it('process a job that updates progress', done => {