diff --git a/src/commands/moveToDelayed-8.lua b/src/commands/moveToDelayed-8.lua index 77e6cd3488..f9673f4a70 100644 --- a/src/commands/moveToDelayed-8.lua +++ b/src/commands/moveToDelayed-8.lua @@ -53,8 +53,12 @@ if rcall("EXISTS", jobKey) == 1 then return -3 end + local maxEvents = rcall("HGET", KEYS[8], "opts.maxLenEvents") or 10000 + rcall("ZADD", delayedKey, score, jobId) - rcall("XADD", KEYS[6], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp) + rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "delayed", + --rcall("XADD", KEYS[6], "*", "event", "delayed", + "jobId", jobId, "delay", delayedTimestamp) -- Check if we need to push a marker job to wake up sleeping workers. local target = getTargetQueueList(KEYS[8], KEYS[1], KEYS[7]) diff --git a/tests/test_events.ts b/tests/test_events.ts index 0c6b57ea4e..2fd012f93f 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -486,62 +486,177 @@ describe('events', function () { await worker.close(); }); - it('should trim events automatically', async () => { - const trimmedQueue = new Queue(queueName, { - connection, - streams: { - events: { - maxLen: 0, + describe('when maxLen is 0', function () { + it('should trim events automatically', async () => { + const trimmedQueue = new Queue(queueName, { + connection, + streams: { + events: { + maxLen: 0, + }, }, - }, - }); + }); - const worker = new Worker( - queueName, - async () => { - await delay(100); - }, - { connection }, - ); + const worker = new Worker( + queueName, + async () => { + await delay(100); + }, + { connection }, + ); - await trimmedQueue.waitUntilReady(); - await worker.waitUntilReady(); + await trimmedQueue.waitUntilReady(); + await worker.waitUntilReady(); - const client = await trimmedQueue.client; + const client = await trimmedQueue.client; - const waitCompletedEvent = new Promise(resolve => { - queueEvents.on( - 'completed', - after(3, async () => { - resolve(); - }), + const waitCompletedEvent = new Promise(resolve => { + queueEvents.on( + 'completed', + after(3, async () => { + resolve(); + }), + ); + }); + + await trimmedQueue.addBulk([ + { name: 'test', data: { foo: 'bar' } }, + { name: 'test', data: { foo: 'baz' } }, + { name: 'test', data: { foo: 'bar' } }, + ]); + + await waitCompletedEvent; + + const [[id, [_, drained]], [, [, completed]]] = await client.xrevrange( + trimmedQueue.keys.events, + '+', + '-', ); + + expect(drained).to.be.equal('drained'); + expect(completed).to.be.equal('completed'); + + const eventsLength = await client.xlen(trimmedQueue.keys.events); + + expect(eventsLength).to.be.lte(2); + + await worker.close(); + await trimmedQueue.close(); + await removeAllQueueData(new IORedis(), queueName); }); + }); - await trimmedQueue.addBulk([ - { name: 'test', data: { foo: 'bar' } }, - { name: 'test', data: { foo: 'baz' } }, - { name: 'test', data: { foo: 'bar' } }, - ]); + describe('when maxLen is greater than 0', function () { + it('should trim events so its length is at least the threshold', async () => { + const numJobs = 80; + const trimmedQueue = new Queue(queueName, { + connection, + streams: { + events: { + maxLen: 20, + }, + }, + }); - await waitCompletedEvent; + const worker = new Worker( + queueName, + async () => { + await delay(50); + }, + { connection }, + ); - const [[id, [_, drained]], [, [, completed]]] = await client.xrevrange( - trimmedQueue.keys.events, - '+', - '-', - ); + await trimmedQueue.waitUntilReady(); + await worker.waitUntilReady(); - expect(drained).to.be.equal('drained'); - expect(completed).to.be.equal('completed'); + const client = await trimmedQueue.client; - const eventsLength = await client.xlen(trimmedQueue.keys.events); + const waitCompletedEvent = new Promise(resolve => { + queueEvents.on( + 'completed', + after(numJobs, async () => { + resolve(); + }), + ); + }); + + const jobs = Array.from(Array(numJobs).keys()).map(() => ({ + name: 'test', + data: { foo: 'bar' }, + })); - expect(eventsLength).to.be.lte(2); + await trimmedQueue.addBulk(jobs); - await worker.close(); - await trimmedQueue.close(); - await removeAllQueueData(new IORedis(), queueName); + await waitCompletedEvent; + + const eventsLength = await client.xlen(trimmedQueue.keys.events); + + expect(eventsLength).to.be.lte(35); + expect(eventsLength).to.be.gte(20); + + await worker.close(); + await trimmedQueue.close(); + await removeAllQueueData(new IORedis(), queueName); + }); + + describe('when jobs are moved to delayed', function () { + it('should trim events so its length is at least the threshold', async () => { + const numJobs = 80; + const trimmedQueue = new Queue(queueName, { + connection, + streams: { + events: { + maxLen: 20, + }, + }, + }); + + const i = 0; + const worker = new Worker( + queueName, + async () => { + await delay(50); + throw new Error('error'); + }, + { connection }, + ); + + await trimmedQueue.waitUntilReady(); + await worker.waitUntilReady(); + + const client = await trimmedQueue.client; + + const waitDelayedEvent = new Promise(resolve => { + queueEvents.on( + 'delayed', + after(numJobs, async () => { + resolve(); + }), + ); + }); + + const jobs = Array.from(Array(numJobs).keys()).map(() => ({ + name: 'test', + data: { foo: 'bar' }, + opts: { + attempts: 2, + backoff: 5000, + }, + })); + await trimmedQueue.addBulk(jobs); + + await waitDelayedEvent; + + const eventsLength = await client.xlen(trimmedQueue.keys.events); + + expect(eventsLength).to.be.lte(35); + expect(eventsLength).to.be.gte(20); + + await worker.close(); + await trimmedQueue.close(); + await removeAllQueueData(new IORedis(), queueName); + }); + }); }); it('should trim events manually', async () => {