Skip to content

Commit

Permalink
fix: get tests working
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Dec 1, 2024
1 parent a383e5d commit 408fc4f
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 55 deletions.
10 changes: 7 additions & 3 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ impl Worker<Context> {
}
false
}
/// Start running the worker
pub fn start(&self) {
self.state.running.store(false, Ordering::Relaxed);
self.state.is_ready.store(false, Ordering::Relaxed);
self.emit(Event::Start);
}
}

impl<Req, Ctx> FromRequest<Request<Req, Ctx>> for Worker<Context> {
Expand Down Expand Up @@ -370,10 +376,8 @@ impl Future for Runnable {
let poller_future = async { while (poller.next().await).is_some() {} };

if !this.running {
worker.running.store(true, Ordering::Relaxed);
worker.is_ready.store(true, Ordering::Release);
worker.start();
this.running = true;
worker.emit(Event::Start);
}
let combined = Box::pin(join(poller_future, heartbeat.as_mut()));

Expand Down
33 changes: 17 additions & 16 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,7 @@ where

#[cfg(test)]
mod tests {
use apalis_core::worker::Context;
use apalis_core::{generic_storage_test, sleep};
use email_service::Email;

Expand Down Expand Up @@ -1021,17 +1022,17 @@ mod tests {
.clone()
}

async fn register_worker_at(storage: &mut RedisStorage<Email>) -> WorkerId {
let worker = WorkerId::new("test-worker");

async fn register_worker_at(storage: &mut RedisStorage<Email>) -> Worker<Context> {
let worker = Worker::new(WorkerId::new("test-worker"), Context::default());
worker.start();
storage
.keep_alive(&worker)
.keep_alive(&worker.id())
.await
.expect("failed to register worker");
worker
}

async fn register_worker(storage: &mut RedisStorage<Email>) -> WorkerId {
async fn register_worker(storage: &mut RedisStorage<Email>) -> Worker<Context> {
register_worker_at(storage).await
}

Expand All @@ -1055,19 +1056,19 @@ mod tests {
let mut storage = setup().await;
push_email(&mut storage, example_email()).await;

let worker_id = register_worker(&mut storage).await;
let worker = register_worker(&mut storage).await;

let _job = consume_one(&mut storage, &worker_id).await;
let _job = consume_one(&mut storage, &worker.id()).await;
}

#[tokio::test]
async fn test_acknowledge_job() {
let mut storage = setup().await;
push_email(&mut storage, example_email()).await;

let worker_id = register_worker(&mut storage).await;
let worker = register_worker(&mut storage).await;

let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker.id()).await;
let ctx = &job.parts.context;
let res = 42usize;
storage
Expand All @@ -1087,13 +1088,13 @@ mod tests {

push_email(&mut storage, example_email()).await;

let worker_id = register_worker(&mut storage).await;
let worker = register_worker(&mut storage).await;

let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker.id()).await;
let job_id = &job.parts.task_id;

storage
.kill(&worker_id, &job_id)
.kill(&worker.id(), &job_id)
.await
.expect("failed to kill job");

Expand All @@ -1106,9 +1107,9 @@ mod tests {

push_email(&mut storage, example_email()).await;

let worker_id = register_worker_at(&mut storage).await;
let worker = register_worker_at(&mut storage).await;

let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker.id()).await;
sleep(Duration::from_millis(1000)).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(1)).unwrap();
let res = storage
Expand All @@ -1134,9 +1135,9 @@ mod tests {

push_email(&mut storage, example_email()).await;

let worker_id = register_worker_at(&mut storage).await;
let worker = register_worker_at(&mut storage).await;
sleep(Duration::from_millis(1100)).await;
let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker.id()).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(5)).unwrap();
let res = storage
.reenqueue_orphaned(1, dead_since)
Expand Down
11 changes: 7 additions & 4 deletions packages/apalis-sql/src/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,9 @@ mod tests {
.keep_alive_at::<DummyService>(&worker_id, last_seen)
.await
.expect("failed to register worker");
Worker::new(worker_id, Context::default())
let wrk = Worker::new(worker_id, Context::default());
wrk.start();
wrk
}

async fn register_worker(storage: &mut MysqlStorage<Email>) -> Worker<Context> {
Expand Down Expand Up @@ -813,7 +815,7 @@ mod tests {
// register a worker not responding since 6 minutes ago
let worker_id = WorkerId::new("test-worker");
let worker = Worker::new(worker_id, Context::default());

worker.start();
let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60);

let six_minutes_ago = Utc::now() - Duration::from_secs(60 * 6);
Expand Down Expand Up @@ -865,8 +867,9 @@ mod tests {

let worker_id = WorkerId::new("test-worker");
let worker = Worker::new(worker_id, Context::default());
worker.start();
storage
.keep_alive_at::<Email>(&worker_id, four_minutes_ago)
.keep_alive_at::<Email>(&worker.id(), four_minutes_ago)
.await
.unwrap();

Expand All @@ -890,7 +893,7 @@ mod tests {
.unwrap();
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Running);
assert_eq!(*ctx.lock_by(), Some(worker.id()));
assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
assert!(ctx.lock_at().is_some());
assert_eq!(*ctx.last_error(), None);
assert_eq!(job.parts.attempt.current(), 1);
Expand Down
30 changes: 16 additions & 14 deletions packages/apalis-sql/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,17 +836,19 @@ mod tests {
async fn register_worker_at(
storage: &mut PostgresStorage<Email>,
last_seen: Timestamp,
) -> WorkerId {
) -> Worker<Context> {
let worker_id = WorkerId::new("test-worker");

storage
.keep_alive_at::<DummyService>(&worker_id, last_seen)
.await
.expect("failed to register worker");
worker_id
let wrk = Worker::new(worker_id, Context::default());
wrk.start();
wrk
}

async fn register_worker(storage: &mut PostgresStorage<Email>) -> WorkerId {
async fn register_worker(storage: &mut PostgresStorage<Email>) -> Worker<Context> {
register_worker_at(storage, Utc::now().timestamp()).await
}

Expand All @@ -872,16 +874,16 @@ mod tests {
let mut storage = setup().await;
push_email(&mut storage, example_email()).await;

let worker_id = register_worker(&mut storage).await;
let worker = register_worker(&mut storage).await;

let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker.id()).await;
let job_id = &job.parts.task_id;

// Refresh our job
let job = get_job(&mut storage, job_id).await;
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Running);
assert_eq!(*ctx.lock_by(), Some(worker_id.clone()));
assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
assert!(ctx.lock_at().is_some());
}

Expand All @@ -891,13 +893,13 @@ mod tests {

push_email(&mut storage, example_email()).await;

let worker_id = register_worker(&mut storage).await;
let worker = register_worker(&mut storage).await;

let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker.id()).await;
let job_id = &job.parts.task_id;

storage
.kill(&worker_id, job_id)
.kill(&worker.id(), job_id)
.await
.expect("failed to kill job");

Expand All @@ -915,9 +917,9 @@ mod tests {
let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60);
let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60);

let worker_id = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await;
let worker = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await;

let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker.id()).await;
storage
.reenqueue_orphaned(1, five_minutes_ago)
.await
Expand All @@ -943,9 +945,9 @@ mod tests {
let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60);
let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60);

let worker_id = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await;
let worker = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await;

let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker.id()).await;
let ctx = &job.parts.context;

assert_eq!(*ctx.status(), State::Running);
Expand All @@ -958,7 +960,7 @@ mod tests {
let job = get_job(&mut storage, job_id).await;
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Running);
assert_eq!(*ctx.lock_by(), Some(worker_id));
assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
assert!(ctx.lock_at().is_some());
assert_eq!(*ctx.last_error(), None);
assert_eq!(job.parts.attempt.current(), 0);
Expand Down
41 changes: 23 additions & 18 deletions packages/apalis-sql/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,10 +665,10 @@ mod tests {

async fn consume_one(
storage: &mut SqliteStorage<Email>,
worker_id: &WorkerId,
worker: &Worker<Context>,
) -> Request<Email, SqlContext> {
let mut stream = storage
.stream_jobs(worker_id, std::time::Duration::from_secs(10), 1)
.stream_jobs(worker, std::time::Duration::from_secs(10), 1)
.boxed();
stream
.next()
Expand All @@ -678,17 +678,22 @@ mod tests {
.expect("no job is pending")
}

async fn register_worker_at(storage: &mut SqliteStorage<Email>, last_seen: i64) -> WorkerId {
async fn register_worker_at(
storage: &mut SqliteStorage<Email>,
last_seen: i64,
) -> Worker<Context> {
let worker_id = WorkerId::new("test-worker");

storage
.keep_alive_at::<DummyService>(&worker_id, last_seen)
.await
.expect("failed to register worker");
worker_id
let wrk = Worker::new(worker_id, Context::default());
wrk.start();
wrk
}

async fn register_worker(storage: &mut SqliteStorage<Email>) -> WorkerId {
async fn register_worker(storage: &mut SqliteStorage<Email>) -> Worker<Context> {
register_worker_at(storage, Utc::now().timestamp()).await
}

Expand All @@ -710,26 +715,26 @@ mod tests {
#[tokio::test]
async fn test_consume_last_pushed_job() {
let mut storage = setup().await;
let worker_id = register_worker(&mut storage).await;
let worker = register_worker(&mut storage).await;

push_email(&mut storage, example_good_email()).await;
let len = storage.len().await.expect("Could not fetch the jobs count");
assert_eq!(len, 1);

let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker).await;
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Running);
assert_eq!(*ctx.lock_by(), Some(worker_id.clone()));
assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
assert!(ctx.lock_at().is_some());
}

#[tokio::test]
async fn test_acknowledge_job() {
let mut storage = setup().await;
let worker_id = register_worker(&mut storage).await;
let worker = register_worker(&mut storage).await;

push_email(&mut storage, example_good_email()).await;
let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker).await;
let job_id = &job.parts.task_id;
let ctx = &job.parts.context;
let res = 1usize;
Expand All @@ -753,13 +758,13 @@ mod tests {

push_email(&mut storage, example_good_email()).await;

let worker_id = register_worker(&mut storage).await;
let worker = register_worker(&mut storage).await;

let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker).await;
let job_id = &job.parts.task_id;

storage
.kill(&worker_id, job_id)
.kill(&worker.id(), job_id)
.await
.expect("failed to kill job");

Expand All @@ -778,9 +783,9 @@ mod tests {
let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60);

let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60);
let worker_id = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await;
let worker = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await;

let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker).await;
let job_id = &job.parts.task_id;
storage
.reenqueue_orphaned(1, five_minutes_ago)
Expand All @@ -804,9 +809,9 @@ mod tests {

let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60);
let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60);
let worker_id = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await;
let worker = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await;

let job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker).await;
let job_id = &job.parts.task_id;
storage
.reenqueue_orphaned(1, six_minutes_ago)
Expand All @@ -816,7 +821,7 @@ mod tests {
let job = get_job(&mut storage, job_id).await;
let ctx = &job.parts.context;
assert_eq!(*ctx.status(), State::Running);
assert_eq!(*ctx.lock_by(), Some(worker_id));
assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
assert!(ctx.lock_at().is_some());
assert_eq!(*ctx.last_error(), None);
assert_eq!(job.parts.attempt.current(), 1);
Expand Down

0 comments on commit 408fc4f

Please sign in to comment.