Skip to content

Commit

Permalink
feat(queue): add global:duplicated event when a duplicated is added (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jun 25, 2024
1 parent 75703e5 commit d632ac1
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 1 deletion.
1 change: 1 addition & 0 deletions lib/commands/addJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ else
jobId = ARGV[2]
jobIdKey = ARGV[1] .. jobId
if rcall("EXISTS", jobIdKey) == 1 then
rcall("PUBLISH", ARGV[1] .. "duplicated", jobId)
return jobId .. "" -- convert to string
end
end
Expand Down
7 changes: 6 additions & 1 deletion lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ const Queue = function Queue(name, url, opts) {
'repeat',
'limiter',
'drained',
'duplicated',
'progress'
],
key => {
Expand Down Expand Up @@ -416,6 +417,7 @@ Queue.prototype._setupQueueEventListeners = function() {
const completedKey = this.keys.completed;
const failedKey = this.keys.failed;
const drainedKey = this.keys.drained;
const duplicatedKey = this.keys.duplicated;

const pmessageHandler = (pattern, channel, message) => {
const keyAndToken = channel.split('@');
Expand All @@ -437,6 +439,9 @@ Queue.prototype._setupQueueEventListeners = function() {
}
utils.emitSafe(this, 'global:stalled', message);
break;
case duplicatedKey:
utils.emitSafe(this, 'global:duplicated', message);
break;
}
};

Expand Down Expand Up @@ -523,7 +528,7 @@ Queue.prototype._registerEvent = function(eventName) {
.isRedisReady(this.eclient)
.then(() => {
const channel = this.toKey(_eventName);
if (['active', 'waiting', 'stalled'].indexOf(_eventName) !== -1) {
if (['active', 'waiting', 'stalled', 'duplicated'].indexOf(_eventName) !== -1) {
return (this.registeredEvents[_eventName] = this.eclient.psubscribe(
channel + '*'
));
Expand Down
21 changes: 21 additions & 0 deletions test/test_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,27 @@ describe('Queue', () => {
}
});

describe('when job has been added again', () => {
it('emits duplicated event', async () => {
queue.process(
async () => {
await delay(50);
await queue.add({ foo: 'bar' }, { jobId: 'a1' });
await delay(50);
}
);

await queue.add({ foo: 'bar' }, { jobId: 'a1' });

await new Promise(resolve => {
queue.once('global:duplicated', (jobId) => {
expect(jobId).to.be.equal('a1');
resolve();
});
});
});
});

it('process a job that updates progress', done => {
queue.process((job, jobDone) => {
expect(job.data.foo).to.be.equal('bar');
Expand Down

0 comments on commit d632ac1

Please sign in to comment.