Skip to content

Commit

Permalink
fix(reprocess-job): add marker if needed (#2406)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Feb 2, 2024
1 parent 5cecea1 commit 5923ed8
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 7 deletions.
28 changes: 28 additions & 0 deletions docs/gitbook/bullmq-pro/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,31 @@
## [6.9.6](https://github.com/taskforcesh/bullmq-pro/compare/v6.9.5...v6.9.6) (2024-01-31)


### Bug Fixes

* **groups:** remove group when removing last job ([#199](https://github.com/taskforcesh/bullmq-pro/issues/199)) ([3066686](https://github.com/taskforcesh/bullmq-pro/commit/3066686df4851334efadd7024cc8566407eabd7f))

## [6.9.5](https://github.com/taskforcesh/bullmq-pro/compare/v6.9.4...v6.9.5) (2024-01-27)


### Bug Fixes

* **batches:** differentiate movetoBatchFinished responses ([#198](https://github.com/taskforcesh/bullmq-pro/issues/198)) ([bb74c50](https://github.com/taskforcesh/bullmq-pro/commit/bb74c501f19fabbb61c4cb637598591f508bd59d))

## [6.9.4](https://github.com/taskforcesh/bullmq-pro/compare/v6.9.3...v6.9.4) (2024-01-20)


### Bug Fixes

* **backoff:** set marker after adding delayed job ([#197](https://github.com/taskforcesh/bullmq-pro/issues/197)) ([50a012e](https://github.com/taskforcesh/bullmq-pro/commit/50a012e352b9608a2a7f36db0cd7643078e183ee))

## [6.9.3](https://github.com/taskforcesh/bullmq-pro/compare/v6.9.2...v6.9.3) (2024-01-18)


### Performance Improvements

* **prioritized:** get target list once in addPrioritizedJob ([#195](https://github.com/taskforcesh/bullmq-pro/issues/195)) ([51cf4a3](https://github.com/taskforcesh/bullmq-pro/commit/51cf4a34d645013a49c01b740cf280666ebc4c97))

## [6.9.2](https://github.com/taskforcesh/bullmq-pro/compare/v6.9.1...v6.9.2) (2024-01-17)


Expand Down
3 changes: 2 additions & 1 deletion python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-8.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-6.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")),
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")),
Expand Down Expand Up @@ -380,6 +380,7 @@ async def reprocessJob(self, job: Job, state: str):
keys.append(self.keys['wait'])
keys.append(self.keys['meta'])
keys.append(self.keys['paused'])
keys.append(self.keys['marker'])

args = [
job.id,
Expand Down
1 change: 1 addition & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,7 @@ export class Scripts {
this.queue.keys.wait,
this.queue.keys.meta,
this.queue.keys.paused,
this.queue.keys.marker,
];

const args = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
KEYS[4] wait key
KEYS[5] meta
KEYS[6] paused key
KEYS[7] marker key
ARGV[1] job.id
ARGV[2] (job.opts.lifo ? 'R' : 'L') + 'PUSH'
Expand All @@ -22,18 +23,22 @@
local rcall = redis.call;

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"

if (rcall("EXISTS", KEYS[1]) == 1) then
if rcall("EXISTS", KEYS[1]) == 1 then
local jobId = ARGV[1]
if (rcall("ZREM", KEYS[3], jobId) == 1) then
rcall("HDEL", KEYS[1], "finishedOn", "processedOn", ARGV[3])

local target = getTargetQueueList(KEYS[5], KEYS[4], KEYS[6])
rcall(ARGV[2], target, jobId)
local target, isPaused = getTargetQueueList(KEYS[5], KEYS[4], KEYS[6])
addJobInTargetList(target, KEYS[7], ARGV[2], isPaused, jobId)

local maxEvents = getOrSetMaxEvents(KEYS[5])
-- Emit waiting event
rcall("XADD", KEYS[2], "*", "event", "waiting", "jobId", jobId, "prev", ARGV[4]);
rcall("XADD", KEYS[2], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
"jobId", jobId, "prev", ARGV[4]);
return 1
else
return -3
Expand Down
4 changes: 2 additions & 2 deletions tests/test_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,10 @@ describe('connection', () => {
},
});

await expect(queueFail.close()).to.be.eventually.equal(undefined);

await expect(queueFail.waitUntilReady()).to.be.eventually.rejectedWith(
'connect ECONNREFUSED 127.0.0.1:1234',
);

await expect(queueFail.close()).to.be.eventually.equal(undefined);
});
});

0 comments on commit 5923ed8

Please sign in to comment.