From 9b572d0fe8ea3f4f1de35c023a2dbae220a4ec27 Mon Sep 17 00:00:00 2001 From: Hicham Azimani Date: Thu, 28 Nov 2024 16:40:05 +0100 Subject: [PATCH] fix: reenque_orphaned for RedisStorage (#468) --- packages/apalis-redis/src/storage.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/packages/apalis-redis/src/storage.rs b/packages/apalis-redis/src/storage.rs index c5f5fee..d29a080 100644 --- a/packages/apalis-redis/src/storage.rs +++ b/packages/apalis-redis/src/storage.rs @@ -947,9 +947,7 @@ where let active_jobs_list = self.config.active_jobs_list(); let signal_list = self.config.signal_list(); - let now = Utc::now(); - let duration = now.signed_duration_since(dead_since); - let dead_since = duration.num_seconds(); + let dead_since = dead_since.timestamp(); let res: Result = reenqueue_orphaned .key(consumers_set) @@ -968,7 +966,7 @@ where #[cfg(test)] mod tests { - use apalis_core::generic_storage_test; + use apalis_core::{generic_storage_test, sleep}; use email_service::Email; use apalis_core::test_utils::apalis_test_service_fn; @@ -1101,7 +1099,7 @@ mod tests { } #[tokio::test] - async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() { + async fn test_heartbeat_renqueueorphaned_pulse_last_seen_1sec() { let mut storage = setup().await; push_email(&mut storage, example_email()).await; @@ -1109,11 +1107,14 @@ mod tests { let worker_id = register_worker_at(&mut storage).await; let job = consume_one(&mut storage, &worker_id).await; - let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(300)).unwrap(); - storage + sleep(Duration::from_millis(1000)).await; + let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(1)).unwrap(); + let res = storage .reenqueue_orphaned(1, dead_since) .await .expect("failed to reenqueue_orphaned"); + // We expect 1 job to be re-enqueued + assert_eq!(res, 1); let job = get_job(&mut storage, &job.parts.task_id).await; let ctx = &job.parts.context; // assert_eq!(*ctx.status(), State::Pending); @@ -1126,19 +1127,21 @@ mod tests { } #[tokio::test] - async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() { + async fn test_heartbeat_renqueueorphaned_pulse_last_seen_5sec() { let mut storage = setup().await; push_email(&mut storage, example_email()).await; let worker_id = register_worker_at(&mut storage).await; - + sleep(Duration::from_millis(1100)).await; let job = consume_one(&mut storage, &worker_id).await; - let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(300)).unwrap(); - storage + let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(5)).unwrap(); + let res = storage .reenqueue_orphaned(1, dead_since) .await .expect("failed to reenqueue_orphaned"); + // We expect 0 job to be re-enqueued + assert_eq!(res, 0); let job = get_job(&mut storage, &job.parts.task_id).await; let _ctx = &job.parts.context; // assert_eq!(*ctx.status(), State::Running);