diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index c550d26..920e56e 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -210,6 +210,12 @@ impl Worker { } false } + /// Start running the worker + pub fn start(&self) { + self.state.running.store(false, Ordering::Relaxed); + self.state.is_ready.store(false, Ordering::Relaxed); + self.emit(Event::Start); + } } impl FromRequest> for Worker { @@ -370,10 +376,8 @@ impl Future for Runnable { let poller_future = async { while (poller.next().await).is_some() {} }; if !this.running { - worker.running.store(true, Ordering::Relaxed); - worker.is_ready.store(true, Ordering::Release); + worker.start(); this.running = true; - worker.emit(Event::Start); } let combined = Box::pin(join(poller_future, heartbeat.as_mut())); diff --git a/packages/apalis-redis/src/storage.rs b/packages/apalis-redis/src/storage.rs index db52289..0d30c3b 100644 --- a/packages/apalis-redis/src/storage.rs +++ b/packages/apalis-redis/src/storage.rs @@ -968,6 +968,7 @@ where #[cfg(test)] mod tests { + use apalis_core::worker::Context; use apalis_core::{generic_storage_test, sleep}; use email_service::Email; @@ -1021,17 +1022,17 @@ mod tests { .clone() } - async fn register_worker_at(storage: &mut RedisStorage) -> WorkerId { - let worker = WorkerId::new("test-worker"); - + async fn register_worker_at(storage: &mut RedisStorage) -> Worker { + let worker = Worker::new(WorkerId::new("test-worker"), Context::default()); + worker.start(); storage - .keep_alive(&worker) + .keep_alive(&worker.id()) .await .expect("failed to register worker"); worker } - async fn register_worker(storage: &mut RedisStorage) -> WorkerId { + async fn register_worker(storage: &mut RedisStorage) -> Worker { register_worker_at(storage).await } @@ -1055,9 +1056,9 @@ mod tests { let mut storage = setup().await; push_email(&mut storage, example_email()).await; - let worker_id = register_worker(&mut storage).await; + let worker = register_worker(&mut storage).await; - let _job = consume_one(&mut storage, &worker_id).await; + let _job = consume_one(&mut storage, &worker.id()).await; } #[tokio::test] @@ -1065,9 +1066,9 @@ mod tests { let mut storage = setup().await; push_email(&mut storage, example_email()).await; - let worker_id = register_worker(&mut storage).await; + let worker = register_worker(&mut storage).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker.id()).await; let ctx = &job.parts.context; let res = 42usize; storage @@ -1087,13 +1088,13 @@ mod tests { push_email(&mut storage, example_email()).await; - let worker_id = register_worker(&mut storage).await; + let worker = register_worker(&mut storage).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker.id()).await; let job_id = &job.parts.task_id; storage - .kill(&worker_id, &job_id) + .kill(&worker.id(), &job_id) .await .expect("failed to kill job"); @@ -1106,9 +1107,9 @@ mod tests { push_email(&mut storage, example_email()).await; - let worker_id = register_worker_at(&mut storage).await; + let worker = register_worker_at(&mut storage).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker.id()).await; sleep(Duration::from_millis(1000)).await; let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(1)).unwrap(); let res = storage @@ -1134,9 +1135,9 @@ mod tests { push_email(&mut storage, example_email()).await; - let worker_id = register_worker_at(&mut storage).await; + let worker = register_worker_at(&mut storage).await; sleep(Duration::from_millis(1100)).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker.id()).await; let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(5)).unwrap(); let res = storage .reenqueue_orphaned(1, dead_since) diff --git a/packages/apalis-sql/src/mysql.rs b/packages/apalis-sql/src/mysql.rs index 71b89a3..40976e5 100644 --- a/packages/apalis-sql/src/mysql.rs +++ b/packages/apalis-sql/src/mysql.rs @@ -735,7 +735,9 @@ mod tests { .keep_alive_at::(&worker_id, last_seen) .await .expect("failed to register worker"); - Worker::new(worker_id, Context::default()) + let wrk = Worker::new(worker_id, Context::default()); + wrk.start(); + wrk } async fn register_worker(storage: &mut MysqlStorage) -> Worker { @@ -813,7 +815,7 @@ mod tests { // register a worker not responding since 6 minutes ago let worker_id = WorkerId::new("test-worker"); let worker = Worker::new(worker_id, Context::default()); - + worker.start(); let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60); let six_minutes_ago = Utc::now() - Duration::from_secs(60 * 6); @@ -865,8 +867,9 @@ mod tests { let worker_id = WorkerId::new("test-worker"); let worker = Worker::new(worker_id, Context::default()); + worker.start(); storage - .keep_alive_at::(&worker_id, four_minutes_ago) + .keep_alive_at::(&worker.id(), four_minutes_ago) .await .unwrap(); @@ -890,7 +893,7 @@ mod tests { .unwrap(); let ctx = job.parts.context; assert_eq!(*ctx.status(), State::Running); - assert_eq!(*ctx.lock_by(), Some(worker.id())); + assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); assert!(ctx.lock_at().is_some()); assert_eq!(*ctx.last_error(), None); assert_eq!(job.parts.attempt.current(), 1); diff --git a/packages/apalis-sql/src/postgres.rs b/packages/apalis-sql/src/postgres.rs index f7284ef..77e96aa 100644 --- a/packages/apalis-sql/src/postgres.rs +++ b/packages/apalis-sql/src/postgres.rs @@ -836,17 +836,19 @@ mod tests { async fn register_worker_at( storage: &mut PostgresStorage, last_seen: Timestamp, - ) -> WorkerId { + ) -> Worker { let worker_id = WorkerId::new("test-worker"); storage .keep_alive_at::(&worker_id, last_seen) .await .expect("failed to register worker"); - worker_id + let wrk = Worker::new(worker_id, Context::default()); + wrk.start(); + wrk } - async fn register_worker(storage: &mut PostgresStorage) -> WorkerId { + async fn register_worker(storage: &mut PostgresStorage) -> Worker { register_worker_at(storage, Utc::now().timestamp()).await } @@ -872,16 +874,16 @@ mod tests { let mut storage = setup().await; push_email(&mut storage, example_email()).await; - let worker_id = register_worker(&mut storage).await; + let worker = register_worker(&mut storage).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker.id()).await; let job_id = &job.parts.task_id; // Refresh our job let job = get_job(&mut storage, job_id).await; let ctx = job.parts.context; assert_eq!(*ctx.status(), State::Running); - assert_eq!(*ctx.lock_by(), Some(worker_id.clone())); + assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); assert!(ctx.lock_at().is_some()); } @@ -891,13 +893,13 @@ mod tests { push_email(&mut storage, example_email()).await; - let worker_id = register_worker(&mut storage).await; + let worker = register_worker(&mut storage).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker.id()).await; let job_id = &job.parts.task_id; storage - .kill(&worker_id, job_id) + .kill(&worker.id(), job_id) .await .expect("failed to kill job"); @@ -915,9 +917,9 @@ mod tests { let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60); - let worker_id = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await; + let worker = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker.id()).await; storage .reenqueue_orphaned(1, five_minutes_ago) .await @@ -943,9 +945,9 @@ mod tests { let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60); let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); - let worker_id = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await; + let worker = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker.id()).await; let ctx = &job.parts.context; assert_eq!(*ctx.status(), State::Running); @@ -958,7 +960,7 @@ mod tests { let job = get_job(&mut storage, job_id).await; let ctx = job.parts.context; assert_eq!(*ctx.status(), State::Running); - assert_eq!(*ctx.lock_by(), Some(worker_id)); + assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); assert!(ctx.lock_at().is_some()); assert_eq!(*ctx.last_error(), None); assert_eq!(job.parts.attempt.current(), 0); diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index 71380de..87db604 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -665,10 +665,10 @@ mod tests { async fn consume_one( storage: &mut SqliteStorage, - worker_id: &WorkerId, + worker: &Worker, ) -> Request { let mut stream = storage - .stream_jobs(worker_id, std::time::Duration::from_secs(10), 1) + .stream_jobs(worker, std::time::Duration::from_secs(10), 1) .boxed(); stream .next() @@ -678,17 +678,22 @@ mod tests { .expect("no job is pending") } - async fn register_worker_at(storage: &mut SqliteStorage, last_seen: i64) -> WorkerId { + async fn register_worker_at( + storage: &mut SqliteStorage, + last_seen: i64, + ) -> Worker { let worker_id = WorkerId::new("test-worker"); storage .keep_alive_at::(&worker_id, last_seen) .await .expect("failed to register worker"); - worker_id + let wrk = Worker::new(worker_id, Context::default()); + wrk.start(); + wrk } - async fn register_worker(storage: &mut SqliteStorage) -> WorkerId { + async fn register_worker(storage: &mut SqliteStorage) -> Worker { register_worker_at(storage, Utc::now().timestamp()).await } @@ -710,26 +715,26 @@ mod tests { #[tokio::test] async fn test_consume_last_pushed_job() { let mut storage = setup().await; - let worker_id = register_worker(&mut storage).await; + let worker = register_worker(&mut storage).await; push_email(&mut storage, example_good_email()).await; let len = storage.len().await.expect("Could not fetch the jobs count"); assert_eq!(len, 1); - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker).await; let ctx = job.parts.context; assert_eq!(*ctx.status(), State::Running); - assert_eq!(*ctx.lock_by(), Some(worker_id.clone())); + assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); assert!(ctx.lock_at().is_some()); } #[tokio::test] async fn test_acknowledge_job() { let mut storage = setup().await; - let worker_id = register_worker(&mut storage).await; + let worker = register_worker(&mut storage).await; push_email(&mut storage, example_good_email()).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker).await; let job_id = &job.parts.task_id; let ctx = &job.parts.context; let res = 1usize; @@ -753,13 +758,13 @@ mod tests { push_email(&mut storage, example_good_email()).await; - let worker_id = register_worker(&mut storage).await; + let worker = register_worker(&mut storage).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker).await; let job_id = &job.parts.task_id; storage - .kill(&worker_id, job_id) + .kill(&worker.id(), job_id) .await .expect("failed to kill job"); @@ -778,9 +783,9 @@ mod tests { let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60); - let worker_id = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await; + let worker = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker).await; let job_id = &job.parts.task_id; storage .reenqueue_orphaned(1, five_minutes_ago) @@ -804,9 +809,9 @@ mod tests { let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60); - let worker_id = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await; + let worker = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await; - let job = consume_one(&mut storage, &worker_id).await; + let job = consume_one(&mut storage, &worker).await; let job_id = &job.parts.task_id; storage .reenqueue_orphaned(1, six_minutes_ago) @@ -816,7 +821,7 @@ mod tests { let job = get_job(&mut storage, job_id).await; let ctx = &job.parts.context; assert_eq!(*ctx.status(), State::Running); - assert_eq!(*ctx.lock_by(), Some(worker_id)); + assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); assert!(ctx.lock_at().is_some()); assert_eq!(*ctx.last_error(), None); assert_eq!(job.parts.attempt.current(), 1);