Skip to content

Commit

Permalink
fix: reenque_orphaned for RedisStorage (#468)
Browse files Browse the repository at this point in the history
  • Loading branch information
AzHicham authored Nov 28, 2024
1 parent 47af9df commit 9b572d0
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,9 +947,7 @@ where
let active_jobs_list = self.config.active_jobs_list();
let signal_list = self.config.signal_list();

let now = Utc::now();
let duration = now.signed_duration_since(dead_since);
let dead_since = duration.num_seconds();
let dead_since = dead_since.timestamp();

let res: Result<usize, RedisError> = reenqueue_orphaned
.key(consumers_set)
Expand All @@ -968,7 +966,7 @@ where

#[cfg(test)]
mod tests {
use apalis_core::generic_storage_test;
use apalis_core::{generic_storage_test, sleep};
use email_service::Email;

use apalis_core::test_utils::apalis_test_service_fn;
Expand Down Expand Up @@ -1101,19 +1099,22 @@ mod tests {
}

#[tokio::test]
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() {
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_1sec() {
let mut storage = setup().await;

push_email(&mut storage, example_email()).await;

let worker_id = register_worker_at(&mut storage).await;

let job = consume_one(&mut storage, &worker_id).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(300)).unwrap();
storage
sleep(Duration::from_millis(1000)).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(1)).unwrap();
let res = storage
.reenqueue_orphaned(1, dead_since)
.await
.expect("failed to reenqueue_orphaned");
// We expect 1 job to be re-enqueued
assert_eq!(res, 1);
let job = get_job(&mut storage, &job.parts.task_id).await;
let ctx = &job.parts.context;
// assert_eq!(*ctx.status(), State::Pending);
Expand All @@ -1126,19 +1127,21 @@ mod tests {
}

#[tokio::test]
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() {
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_5sec() {
let mut storage = setup().await;

push_email(&mut storage, example_email()).await;

let worker_id = register_worker_at(&mut storage).await;

sleep(Duration::from_millis(1100)).await;
let job = consume_one(&mut storage, &worker_id).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(300)).unwrap();
storage
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(5)).unwrap();
let res = storage
.reenqueue_orphaned(1, dead_since)
.await
.expect("failed to reenqueue_orphaned");
// We expect 0 job to be re-enqueued
assert_eq!(res, 0);
let job = get_job(&mut storage, &job.parts.task_id).await;
let _ctx = &job.parts.context;
// assert_eq!(*ctx.status(), State::Running);
Expand Down

0 comments on commit 9b572d0

Please sign in to comment.