diff --git a/docs/gitbook/guide/jobs/debouncing.md b/docs/gitbook/guide/jobs/debouncing.md index b4225838f8..ce87711d3c 100644 --- a/docs/gitbook/guide/jobs/debouncing.md +++ b/docs/gitbook/guide/jobs/debouncing.md @@ -44,6 +44,14 @@ This mode is particularly useful for jobs that have a long running time or those Any manual deletion will disable the debouncing. For example, when calling _job.remove_ method. {% endhint %} +## Get Debounce Job Id + +If you need to know which is the job id that started the debounce state. You can call **getDebounceJobId** method. + +```typescript +const jobId = await myQueue.getDebounceJobId('customValue'); +``` + ## Remove Debounce Key If you need to stop debouncing before ttl finishes or before finishing a job. You can call **removeDebounceKey** method. diff --git a/src/classes/queue-getters.ts b/src/classes/queue-getters.ts index c910f6028c..584dc2f799 100644 --- a/src/classes/queue-getters.ts +++ b/src/classes/queue-getters.ts @@ -113,6 +113,17 @@ export class QueueGetters< return this.scripts.getRateLimitTtl(maxJobs); } + /** + * Get jobId that starts debounced state. + * + * @param id - debounce identifier + */ + async getDebounceJobId(id: string): Promise { + const client = await this.client; + + return client.get(`${this.keys.de}:${id}`); + } + /** * Job counts by type * diff --git a/tests/test_flow.ts b/tests/test_flow.ts index 8df615fe24..e9c1b0baa9 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -252,6 +252,103 @@ describe('flows', () => { }).timeout(8000); }); + describe('when child is debounced when added again with same debounce id', function () { + describe('when ttl is not provided', function () { + it('waits until job is finished before removing debounce key', async function () { + const parentQueueName = `parent-queue-${v4()}`; + + const flow = new FlowProducer({ connection, prefix }); + const queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + + const worker = new Worker( + queueName, + async job => { + await delay(100); + + const jobIdFromDebounceKey = await queue.getDebounceJobId( + 'debounce_id', + ); + expect(jobIdFromDebounceKey).to.be.equal(job.id); + + await flow.add({ + name: 'parent', + data: {}, + queueName: parentQueueName, + children: [ + { + queueName, + name: 'child0', + data: {}, + opts: { + debounce: { + id: 'debounce_id', + }, + }, + }, + ], + }); + + await delay(100); + }, + { + autorun: false, + connection, + prefix, + }, + ); + await worker.waitUntilReady(); + + const { children } = await flow.add({ + name: 'parent', + data: {}, + queueName: parentQueueName, + children: [ + { + queueName, + name: 'child0', + data: {}, + opts: { + debounce: { + id: 'debounce_id', + }, + }, + }, + ], + }); + + let debouncedCounter = 0; + + const completing = new Promise(resolve => { + queueEvents.once('completed', ({ jobId }) => { + expect(children![0].job.id).to.be.equal(jobId); + resolve(); + }); + + queueEvents.on('debounced', ({ jobId }) => { + debouncedCounter++; + }); + }); + + worker.run(); + + await completing; + + const jobIdFromDebounceKey = await queue.getDebounceJobId( + 'debounce_id', + ); + expect(jobIdFromDebounceKey).to.be.null; + + expect(debouncedCounter).to.be.equal(1); + + await worker.close(); + await queueEvents.close(); + await flow.close(); + await removeAllQueueData(new IORedis(redisHost), parentQueueName); + }); + }); + }); + it('should process children before the parent', async () => { const name = 'child-job'; const values = [