From a28379a1375b44f116a07e60f1daa0ef203b363b Mon Sep 17 00:00:00 2001 From: Nicolas Simonds Date: Sat, 23 Mar 2024 12:03:04 -0700 Subject: [PATCH] RedisBroker: Track non-idempotent jobs as running, too Push running-job markers down into the Redis for all jobs, and move the logic for re-enqueueing jobs from dead brokers into the script itself. Non-idempotent jobs running on a dead broker are still NOT re-enqueued. This should cause non-idempotent jobs to no longer run "invisibly" on a Redis broker, as well as causing dead brokers to signal any non-idempotent jobs that were running on them as failed. Fixes: Issue #18 --- .../enqueue_jobs_from_dead_broker.lua | 6 ++++-- .../brokers/redis_scripts/get_jobs_from_queue.lua | 12 +++++------- tests/test_redis_brokers.py | 15 ++++++++++++--- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/spinach/brokers/redis_scripts/enqueue_jobs_from_dead_broker.lua b/spinach/brokers/redis_scripts/enqueue_jobs_from_dead_broker.lua index f71fa59..10ae932 100644 --- a/spinach/brokers/redis_scripts/enqueue_jobs_from_dead_broker.lua +++ b/spinach/brokers/redis_scripts/enqueue_jobs_from_dead_broker.lua @@ -16,7 +16,8 @@ local jobs_json = redis.call('hvals', running_jobs_key) for _, job_json in ipairs(jobs_json) do local job = cjson.decode(job_json) - if job["retries"] < job["max_retries"] then + -- `max_retries == 0` jobs are non-idempotent, do not re-run them + if job["max_retries"] > 0 and job["retries"] < job["max_retries"] then job["retries"] = job["retries"] + 1 -- Set job status to queued: -- A major difference between retrying a job failing in a worker and @@ -41,7 +42,8 @@ for _, job_json in ipairs(jobs_json) do redis.call('rpush', queue, job_json) num_enqueued_jobs = num_enqueued_jobs + 1 else - -- Keep track of jobs that exceeded the max_retries + -- Keep track of jobs that exceeded the max_retries (or were not + -- retryable) failed_jobs[i] = job_json i = i + 1 end diff --git a/spinach/brokers/redis_scripts/get_jobs_from_queue.lua b/spinach/brokers/redis_scripts/get_jobs_from_queue.lua index 7c5cc54..d5959c0 100644 --- a/spinach/brokers/redis_scripts/get_jobs_from_queue.lua +++ b/spinach/brokers/redis_scripts/get_jobs_from_queue.lua @@ -30,13 +30,11 @@ repeat job["status"] = job_status_running local job_json = cjson.encode(job) - if job["max_retries"] > 0 then - -- job is idempotent, must track if it's running - redis.call('hset', running_jobs_key, job["id"], job_json) - -- If tracking concurrency, bump the current value. - if max_concurrency ~= -1 then - redis.call('hincrby', current_concurrency_key, job['task_name'], 1) - end + -- track the running job + redis.call('hset', running_jobs_key, job["id"], job_json) + -- If tracking concurrency, bump the current value. + if max_concurrency ~= -1 then + redis.call('hincrby', current_concurrency_key, job['task_name'], 1) end jobs[i] = job_json diff --git a/tests/test_redis_brokers.py b/tests/test_redis_brokers.py index 67679a1..829709e 100644 --- a/tests/test_redis_brokers.py +++ b/tests/test_redis_brokers.py @@ -79,7 +79,11 @@ def test_running_job(broker): broker.enqueue_jobs([job]) assert broker._r.hget(running_jobs_key, str(job.id)) is None broker.get_jobs_from_queue('foo_queue', 1) - assert broker._r.hget(running_jobs_key, str(job.id)) is None + job.status = JobStatus.RUNNING + assert ( + Job.deserialize(broker._r.hget(running_jobs_key, str(job.id)).decode()) + == job + ) # Try to remove it, even if it doesn't exist in running broker.remove_job_from_running(job) @@ -281,8 +285,13 @@ def test_enqueue_jobs_from_dead_broker(broker, broker_2): ) assert current == b'1' - # Mark broker as dead, should re-enqueue only the idempotent jobs. - assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == (2, []) + # Mark broker as dead, should re-enqueue only the idempotent jobs. The + # non-idempotent job will report as failed. + num_requeued, failed = broker_2.enqueue_jobs_from_dead_broker(broker._id) + assert num_requeued == 2 + jobs = [Job.deserialize(job.decode()) for job in failed] + job_1.status = JobStatus.RUNNING + assert [job_1] == jobs # Check that the current_concurrency was decremented for job_3. current = broker._r.hget(