From 85f6f6f181003fafbf75304a268170f0d271ccc3 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Sat, 30 Nov 2024 08:45:49 -0600 Subject: [PATCH] fix(flow): allow using removeOnFail and failParentOnFailure in parents (#2947) fixes #2229 --- .../moveParentFromWaitingChildrenToFailed.lua | 12 +- src/commands/includes/removeJobsOnFail.lua | 36 +++++ src/commands/moveStalledJobsToWait-9.lua | 29 +--- tests/test_flow.ts | 137 +++++++++++++++++- 4 files changed, 184 insertions(+), 30 deletions(-) create mode 100644 src/commands/includes/removeJobsOnFail.lua diff --git a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua index 4ad4723506..a87e4187b9 100644 --- a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua +++ b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua @@ -5,16 +5,19 @@ -- Includes --- @include "moveParentToWaitIfNeeded" --- @include "removeDeduplicationKeyIfNeeded" +--- @include "removeJobsOnFail" local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, parentId, jobIdKey, timestamp) if rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) == 1 then - rcall("ZADD", parentQueueKey .. ":failed", timestamp, parentId) + local parentQueuePrefix = parentQueueKey .. ":" + local parentFailedKey = parentQueueKey .. ":failed" + rcall("ZADD", parentFailedKey, timestamp, parentId) local failedReason = "child " .. jobIdKey .. " failed" rcall("HMSET", parentKey, "failedReason", failedReason, "finishedOn", timestamp) rcall("XADD", parentQueueKey .. ":events", "*", "event", "failed", "jobId", parentId, "failedReason", failedReason, "prev", "waiting-children") - local jobAttributes = rcall("HMGET", parentKey, "parent", "deid") + local jobAttributes = rcall("HMGET", parentKey, "parent", "deid", "opts") removeDeduplicationKeyIfNeeded(parentQueueKey .. ":", jobAttributes[2]) @@ -41,5 +44,10 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, end end end + + local parentRawOpts = jobAttributes[3] + local parentOpts = cjson.decode(parentRawOpts) + + removeJobsOnFail(parentQueuePrefix, parentFailedKey, parentId, parentOpts, timestamp) end end diff --git a/src/commands/includes/removeJobsOnFail.lua b/src/commands/includes/removeJobsOnFail.lua new file mode 100644 index 0000000000..a7fa14aad9 --- /dev/null +++ b/src/commands/includes/removeJobsOnFail.lua @@ -0,0 +1,36 @@ +--[[ + Functions to remove jobs when removeOnFail option is provided. +]] + +-- Includes +--- @include "removeJob" +--- @include "removeJobsByMaxAge" +--- @include "removeJobsByMaxCount" + +local function removeJobsOnFail(queueKeyPrefix, failedKey, jobId, opts, timestamp) + local removeOnFailType = type(opts["removeOnFail"]) + if removeOnFailType == "number" then + removeJobsByMaxCount(opts["removeOnFail"], + failedKey, queueKeyPrefix) + elseif removeOnFailType == "boolean" then + if opts["removeOnFail"] then + removeJob(jobId, false, queueKeyPrefix, + false --[[remove debounce key]]) + rcall("ZREM", failedKey, jobId) + end + elseif removeOnFailType ~= "nil" then + local maxAge = opts["removeOnFail"]["age"] + local maxCount = opts["removeOnFail"]["count"] + + if maxAge ~= nil then + removeJobsByMaxAge(timestamp, maxAge, + failedKey, queueKeyPrefix) + end + + if maxCount ~= nil and maxCount > 0 then + removeJobsByMaxCount(maxCount, failedKey, + queueKeyPrefix) + end + end +end + \ No newline at end of file diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index 2e6161ebee..83a7af1cb2 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -30,9 +30,7 @@ local rcall = redis.call --- @include "includes/moveParentFromWaitingChildrenToFailed" --- @include "includes/moveParentToWaitIfNeeded" --- @include "includes/removeDeduplicationKeyIfNeeded" ---- @include "includes/removeJob" ---- @include "includes/removeJobsByMaxAge" ---- @include "includes/removeJobsByMaxCount" +--- @include "includes/removeJobsOnFail" --- @include "includes/trimEvents" local stalledKey = KEYS[1] @@ -86,7 +84,6 @@ if (#stalling > 0) then local rawOpts = jobAttributes[1] local rawParentData = jobAttributes[2] local opts = cjson.decode(rawOpts) - local removeOnFailType = type(opts["removeOnFail"]) rcall("ZADD", failedKey, timestamp, jobId) removeDeduplicationKeyIfNeeded(queueKeyPrefix, jobAttributes[3]) @@ -123,29 +120,7 @@ if (#stalling > 0) then end end - if removeOnFailType == "number" then - removeJobsByMaxCount(opts["removeOnFail"], - failedKey, queueKeyPrefix) - elseif removeOnFailType == "boolean" then - if opts["removeOnFail"] then - removeJob(jobId, false, queueKeyPrefix, - false --[[remove debounce key]]) - rcall("ZREM", failedKey, jobId) - end - elseif removeOnFailType ~= "nil" then - local maxAge = opts["removeOnFail"]["age"] - local maxCount = opts["removeOnFail"]["count"] - - if maxAge ~= nil then - removeJobsByMaxAge(timestamp, maxAge, - failedKey, queueKeyPrefix) - end - - if maxCount ~= nil and maxCount > 0 then - removeJobsByMaxCount(maxCount, failedKey, - queueKeyPrefix) - end - end + removeJobsOnFail(queueKeyPrefix, failedKey, jobId, opts, timestamp) table.insert(failed, jobId) else diff --git a/tests/test_flow.ts b/tests/test_flow.ts index 0369a39988..75554abe01 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -2239,7 +2239,142 @@ describe('flows', () => { await removeAllQueueData(new IORedis(redisHost), parentQueueName); await removeAllQueueData(new IORedis(redisHost), grandChildrenQueueName); - }).timeout(8000); + }); + + describe('when removeOnFail option is provided', async () => { + it('should remove parent when child is moved to failed', async () => { + const name = 'child-job'; + + const parentQueueName = `parent-queue-${v4()}`; + const grandChildrenQueueName = `grand-children-queue-${v4()}`; + + const parentQueue = new Queue(parentQueueName, { + connection, + prefix, + }); + const grandChildrenQueue = new Queue(grandChildrenQueueName, { + connection, + prefix, + }); + const queueEvents = new QueueEvents(parentQueueName, { + connection, + prefix, + }); + await queueEvents.waitUntilReady(); + + let grandChildrenProcessor, + processedGrandChildren = 0; + const processingChildren = new Promise(resolve => { + grandChildrenProcessor = async () => { + processedGrandChildren++; + + if (processedGrandChildren === 2) { + return resolve(); + } + + await delay(200); + + throw new Error('failed'); + }; + }); + + const grandChildrenWorker = new Worker( + grandChildrenQueueName, + grandChildrenProcessor, + { connection, prefix }, + ); + + await grandChildrenWorker.waitUntilReady(); + + const flow = new FlowProducer({ connection, prefix }); + const tree = await flow.add({ + name: 'parent-job', + queueName: parentQueueName, + data: {}, + children: [ + { + name, + data: { foo: 'bar' }, + queueName, + }, + { + name, + data: { foo: 'qux' }, + queueName, + opts: { failParentOnFailure: true, removeOnFail: true }, + children: [ + { + name, + data: { foo: 'bar' }, + queueName: grandChildrenQueueName, + opts: { failParentOnFailure: true }, + }, + { + name, + data: { foo: 'baz' }, + queueName: grandChildrenQueueName, + }, + ], + }, + ], + }); + + const failed = new Promise(resolve => { + queueEvents.on('failed', async ({ jobId, failedReason, prev }) => { + if (jobId === tree.job.id) { + expect(prev).to.be.equal('waiting-children'); + expect(failedReason).to.be.equal( + `child ${prefix}:${queueName}:${tree.children[1].job.id} failed`, + ); + resolve(); + } + }); + }); + + expect(tree).to.have.property('job'); + expect(tree).to.have.property('children'); + + const { children, job } = tree; + const parentState = await job.getState(); + + expect(parentState).to.be.eql('waiting-children'); + + await processingChildren; + await failed; + + const { children: grandChildren } = children[1]; + const updatedGrandchildJob = await grandChildrenQueue.getJob( + grandChildren[0].job.id, + ); + const grandChildState = await updatedGrandchildJob.getState(); + + expect(grandChildState).to.be.eql('failed'); + expect(updatedGrandchildJob.failedReason).to.be.eql('failed'); + + const updatedParentJob = await queue.getJob(children[1].job.id); + expect(updatedParentJob).to.be.undefined; + + const updatedGrandparentJob = await parentQueue.getJob(job.id); + const updatedGrandparentState = await updatedGrandparentJob.getState(); + + expect(updatedGrandparentState).to.be.eql('failed'); + expect(updatedGrandparentJob.failedReason).to.be.eql( + `child ${prefix}:${queueName}:${children[1].job.id} failed`, + ); + + await parentQueue.close(); + await grandChildrenQueue.close(); + await grandChildrenWorker.close(); + await flow.close(); + await queueEvents.close(); + + await removeAllQueueData(new IORedis(redisHost), parentQueueName); + await removeAllQueueData( + new IORedis(redisHost), + grandChildrenQueueName, + ); + }); + }); describe('when removeDependencyOnFailure is provided', async () => { it('moves parent to wait after children fail', async () => {