Skip to content

Commit

Permalink
fix: better handle attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Nov 22, 2024
1 parent 1fae485 commit 166e8fe
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 10 deletions.
5 changes: 3 additions & 2 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,9 +1110,10 @@ mod tests {
.await
.expect("failed to reenqueue_orphaned");
let job = get_job(&mut storage, &job.parts.task_id).await;
let ctx = &job.parts.context;
let _ctx = &job.parts.context;
// assert_eq!(*ctx.status(), State::Running);
assert_eq!(ctx.lock_by, Some(worker_id));
// TODO: update redis context
// assert_eq!(ctx.lock_by, Some(worker_id));
// assert!(ctx.lock_at().is_some());
// assert_eq!(*ctx.last_error(), None);
assert_eq!(job.parts.attempt.current(), 0);
Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-sql/src/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ impl<T, C: Codec> MysqlStorage<T, C> {
let query = r#"Update jobs
INNER JOIN ( SELECT workers.id as worker_id, jobs.id as job_id from workers INNER JOIN jobs ON jobs.lock_by = workers.id WHERE jobs.status = "Running" AND workers.last_seen < ? AND workers.worker_type = ?
ORDER BY lock_at ASC LIMIT ?) as workers ON jobs.lock_by = workers.worker_id AND jobs.id = workers.job_id
SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL, last_error ="Job was abandoned", attempts = attempts + 1;"#;
SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL, last_error ="Job was abandoned";"#;

sqlx::query(query)
.bind(dead_since)
Expand Down Expand Up @@ -770,6 +770,6 @@ mod tests {
assert_eq!(*ctx.lock_by(), Some(worker_id));
assert!(ctx.lock_at().is_some());
assert_eq!(*ctx.last_error(), None);
assert_eq!(job.parts.attempt.current(), 0);
assert_eq!(job.parts.attempt.current(), 1);
}
}
2 changes: 1 addition & 1 deletion packages/apalis-sql/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ impl<T, C: Codec> PostgresStorage<T, C> {
let job_type = self.config.namespace.clone();
let mut tx = self.pool.acquire().await?;
let query = "UPDATE apalis.jobs
SET status = 'Pending', done_at = NULL, lock_by = NULL, lock_at = NULL, last_error = 'Job was abandoned', attempts = attempts + 1
SET status = 'Pending', done_at = NULL, lock_by = NULL, lock_at = NULL, last_error = 'Job was abandoned'
WHERE id IN
(SELECT jobs.id FROM apalis.jobs INNER JOIN apalis.workers ON lock_by = workers.id
WHERE status = 'Running'
Expand Down
9 changes: 4 additions & 5 deletions packages/apalis-sql/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ async fn fetch_next(
config: &Config,
) -> Result<Option<SqlRequest<String>>, sqlx::Error> {
let now: i64 = Utc::now().timestamp();
let update_query = "UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3 WHERE id = ?1 AND job_type = ?4 AND status = 'Pending' AND lock_by IS NULL; Select * from Jobs where id = ?1 AND lock_by = ?2 AND job_type = ?4";
let update_query = "UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3, attempts = attempts + 1 WHERE id = ?1 AND job_type = ?4 AND status = 'Pending' AND lock_by IS NULL; Select * from Jobs where id = ?1 AND lock_by = ?2 AND job_type = ?4";
let job: Option<SqlRequest<String>> = sqlx::query_as(update_query)
.bind(id.to_string())
.bind(worker_id.to_string())
Expand Down Expand Up @@ -432,7 +432,7 @@ impl<T> SqliteStorage<T> {
let job_type = self.config.namespace.clone();
let mut tx = self.pool.acquire().await?;
let query = r#"Update Jobs
SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL, last_error ="Job was abandoned", attempts = attempts + 1
SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL, last_error ="Job was abandoned"
WHERE id in
(SELECT Jobs.id from Jobs INNER join Workers ON lock_by = Workers.id
WHERE status= "Running" AND workers.last_seen < ?1
Expand Down Expand Up @@ -502,15 +502,14 @@ impl<T: Sync + Send, Res: Serialize + Sync> Ack<T, Res> for SqliteStorage<T> {
async fn ack(&mut self, ctx: &Self::Context, res: &Response<Res>) -> Result<(), sqlx::Error> {
let pool = self.pool.clone();
let query =
"UPDATE Jobs SET status = ?4, done_at = strftime('%s','now'), last_error = ?3, attempts =?5 WHERE id = ?1 AND lock_by = ?2";
"UPDATE Jobs SET status = ?4, done_at = strftime('%s','now'), last_error = ?3 WHERE id = ?1 AND lock_by = ?2";
let result = serde_json::to_string(&res.inner.as_ref().map_err(|r| r.to_string()))
.map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?;
sqlx::query(query)
.bind(res.task_id.to_string())
.bind(ctx.lock_by().as_ref().unwrap().to_string())
.bind(result)
.bind(calculate_status(&res.inner).to_string())
.bind(res.attempt.current() as i64 + 1)
.execute(&pool)
.await?;
Ok(())
Expand Down Expand Up @@ -722,6 +721,6 @@ mod tests {
assert_eq!(*ctx.lock_by(), Some(worker_id));
assert!(ctx.lock_at().is_some());
assert_eq!(*ctx.last_error(), None);
assert_eq!(job.parts.attempt.current(), 0);
assert_eq!(job.parts.attempt.current(), 1);
}
}

0 comments on commit 166e8fe

Please sign in to comment.