diff --git a/Cargo.toml b/Cargo.toml index 1da3a9a..a6424c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,7 +125,7 @@ members = [ "examples/catch-panic", "examples/graceful-shutdown", "examples/unmonitored-worker", - "examples/fn-args", "examples/persisted-cron", + "examples/fn-args", "examples/persisted-cron", "examples/rest-api", ] diff --git a/examples/rest-api/Cargo.toml b/examples/rest-api/Cargo.toml new file mode 100644 index 0000000..0dfc63a --- /dev/null +++ b/examples/rest-api/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "rest-api" +version = "0.1.0" +edition.workspace = true +repository.workspace = true + +[dependencies] +anyhow = "1" +apalis = { path = "../../" } +apalis-redis = { path = "../../packages/apalis-redis" } +serde = "1" +env_logger = "0.10" +actix-web = "4" +futures = "0.3" +email-service = { path = "../email-service" } diff --git a/examples/rest-api/src/main.rs b/examples/rest-api/src/main.rs new file mode 100644 index 0000000..e8bc43a --- /dev/null +++ b/examples/rest-api/src/main.rs @@ -0,0 +1,108 @@ +use actix_web::rt::signal; +use actix_web::{web, App, HttpResponse, HttpServer}; +use anyhow::Result; +use apalis::prelude::*; + +use apalis_redis::RedisStorage; +use futures::future; + +use email_service::{send_email, Email}; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Debug)] +struct Filter { + #[serde(default)] + pub status: State, + #[serde(default = "default_page")] + pub page: i32, +} + +fn default_page() -> i32 { + 1 +} + +#[derive(Debug, Serialize, Deserialize)] +struct GetJobsResult { + pub stats: Stat, + pub jobs: Vec, +} + +async fn push_job(job: web::Json, storage: web::Data>) -> HttpResponse { + let mut storage = (**storage).clone(); + let res = storage.push(job.into_inner()).await; + match res { + Ok(parts) => { + HttpResponse::Ok().body(format!("Job with ID [{}] added to queue", parts.task_id)) + } + Err(e) => HttpResponse::InternalServerError().json(e.to_string()), + } +} + +async fn get_jobs( + storage: web::Data>, + filter: web::Query, +) -> HttpResponse { + let stats = storage.stats().await.unwrap_or_default(); + let res = storage.list_jobs(&filter.status, filter.page).await; + match res { + Ok(jobs) => HttpResponse::Ok().json(GetJobsResult { stats, jobs }), + Err(e) => HttpResponse::InternalServerError().json(e.to_string()), + } +} + +async fn get_workers(storage: web::Data>) -> HttpResponse { + let workers = storage.list_workers().await; + match workers { + Ok(workers) => HttpResponse::Ok().json(workers), + Err(e) => HttpResponse::InternalServerError().json(e.to_string()), + } +} + +async fn get_job( + job_id: web::Path, + storage: web::Data>, +) -> HttpResponse { + let mut storage = (**storage).clone(); + + let res = storage.fetch_by_id(&job_id).await; + match res { + Ok(Some(job)) => HttpResponse::Ok().json(job), + Ok(None) => HttpResponse::NotFound().finish(), + Err(e) => HttpResponse::InternalServerError().json(e.to_string()), + } +} + +#[actix_web::main] +async fn main() -> Result<()> { + std::env::set_var("RUST_LOG", "debug"); + env_logger::init(); + + let conn = apalis_redis::connect("redis://127.0.0.1/").await?; + let storage = RedisStorage::new(conn); + let data = web::Data::new(storage.clone()); + let http = async { + HttpServer::new(move || { + App::new() + .app_data(data.clone()) + .route("/", web::get().to(get_jobs)) // Fetch jobs in queue + .route("/workers", web::get().to(get_workers)) // Fetch workers + .route("/job", web::put().to(push_job)) // Allow add jobs via api + .route("/job/{job_id}", web::get().to(get_job)) // Allow fetch specific job + }) + .bind("127.0.0.1:8000")? + .run() + .await?; + Ok(()) + }; + let worker = Monitor::new() + .register({ + WorkerBuilder::new("tasty-avocado") + .enable_tracing() + .backend(storage) + .build_fn(send_email) + }) + .run_with_signal(signal::ctrl_c()); + + future::try_join(http, worker).await?; + Ok(()) +} diff --git a/packages/apalis-core/src/backend.rs b/packages/apalis-core/src/backend.rs new file mode 100644 index 0000000..eae050e --- /dev/null +++ b/packages/apalis-core/src/backend.rs @@ -0,0 +1,92 @@ +use std::{any::type_name, future::Future}; + +use futures::Stream; +use serde::{Deserialize, Serialize}; +use tower::Service; + +use crate::{ + poller::Poller, + request::State, + worker::{Context, Worker}, +}; + +/// A backend represents a task source +/// Both [`Storage`] and [`MessageQueue`] need to implement it for workers to be able to consume tasks +/// +/// [`Storage`]: crate::storage::Storage +/// [`MessageQueue`]: crate::mq::MessageQueue +pub trait Backend { + /// The stream to be produced by the backend + type Stream: Stream, crate::error::Error>>; + + /// Returns the final decoration of layers + type Layer; + + /// Returns a poller that is ready for streaming + fn poll>( + self, + worker: &Worker, + ) -> Poller; +} + +/// Represents functionality that allows reading of jobs and stats from a backend +/// Some backends esp MessageQueues may not currently implement this +pub trait BackendExpose +where + Self: Sized, +{ + /// The request type being handled by the backend + type Request; + /// The error returned during reading jobs and stats + type Error; + /// List all Workers that are working on a backend + fn list_workers( + &self, + ) -> impl Future>, Self::Error>> + Send; + + /// Returns the counts of jobs in different states + fn stats(&self) -> impl Future> + Send; + + /// Fetch jobs persisted in a backend + fn list_jobs( + &self, + status: &State, + page: i32, + ) -> impl Future, Self::Error>> + Send; +} + +/// Represents the current statistics of a backend +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct Stat { + /// Represents pending tasks + pub pending: usize, + /// Represents running tasks + pub running: usize, + /// Represents dead tasks + pub dead: usize, + /// Represents failed tasks + pub failed: usize, + /// Represents successful tasks + pub success: usize, +} + +/// A serializable version of a worker's state. +#[derive(Debug, Serialize, Deserialize)] +pub struct WorkerState { + /// Type of task being consumed by the worker, useful for display and filtering + pub r#type: String, + /// The type of job stream + pub source: String, + // TODO: // The layers that were loaded for worker. + // TODO: // pub layers: Vec, + // TODO: // last_seen: Timestamp, +} +impl WorkerState { + /// Build a new state + pub fn new(r#type: String) -> Self { + Self { + r#type, + source: type_name::().to_string(), + } + } +} diff --git a/packages/apalis-core/src/builder.rs b/packages/apalis-core/src/builder.rs index aca69c9..b362509 100644 --- a/packages/apalis-core/src/builder.rs +++ b/packages/apalis-core/src/builder.rs @@ -7,13 +7,13 @@ use tower::{ }; use crate::{ + backend::Backend, error::Error, layers::extensions::Data, request::Request, service_fn::service_fn, service_fn::ServiceFn, worker::{Ready, Worker, WorkerId}, - Backend, }; /// Allows building a [`Worker`]. diff --git a/packages/apalis-core/src/codec/json.rs b/packages/apalis-core/src/codec/json.rs index 864f160..35376ad 100644 --- a/packages/apalis-core/src/codec/json.rs +++ b/packages/apalis-core/src/codec/json.rs @@ -1,6 +1,6 @@ use std::marker::PhantomData; -use crate::Codec; +use crate::codec::Codec; use serde::{Deserialize, Serialize}; use serde_json::Value; diff --git a/packages/apalis-core/src/codec/mod.rs b/packages/apalis-core/src/codec/mod.rs index a2d1f55..c12d6fa 100644 --- a/packages/apalis-core/src/codec/mod.rs +++ b/packages/apalis-core/src/codec/mod.rs @@ -1,3 +1,23 @@ +use serde::{Deserialize, Serialize}; + +use crate::error::BoxDynError; + +/// A codec allows backends to encode and decode data +pub trait Codec { + /// The mode of storage by the codec + type Compact; + /// Error encountered by the codec + type Error: Into; + /// The encoding method + fn encode(input: I) -> Result + where + I: Serialize; + /// The decoding method + fn decode(input: Self::Compact) -> Result + where + O: for<'de> Deserialize<'de>; +} + /// Encoding for tasks using json #[cfg(feature = "json")] pub mod json; diff --git a/packages/apalis-core/src/lib.rs b/packages/apalis-core/src/lib.rs index a0dc50f..6e356d4 100644 --- a/packages/apalis-core/src/lib.rs +++ b/packages/apalis-core/src/lib.rs @@ -22,15 +22,11 @@ #![cfg_attr(docsrs, feature(doc_cfg))] //! # apalis-core //! Utilities for building job and message processing tools. -use error::BoxDynError; -use futures::Stream; -use poller::Poller; -use serde::{Deserialize, Serialize}; -use tower::Service; -use worker::{Context, Worker}; - /// Represent utilities for creating worker instances. pub mod builder; + +/// Represents a task source eg Postgres or Redis +pub mod backend; /// Includes all possible error types. pub mod error; /// Represents middleware offered through [`tower`] @@ -66,40 +62,6 @@ pub mod task; /// Codec for handling data pub mod codec; -/// A backend represents a task source -/// Both [`Storage`] and [`MessageQueue`] need to implement it for workers to be able to consume tasks -/// -/// [`Storage`]: crate::storage::Storage -/// [`MessageQueue`]: crate::mq::MessageQueue -pub trait Backend { - /// The stream to be produced by the backend - type Stream: Stream, crate::error::Error>>; - - /// Returns the final decoration of layers - type Layer; - - /// Returns a poller that is ready for streaming - fn poll>( - self, - worker: &Worker, - ) -> Poller; -} -/// A codec allows backends to encode and decode data -pub trait Codec { - /// The mode of storage by the codec - type Compact; - /// Error encountered by the codec - type Error: Into; - /// The encoding method - fn encode(input: I) -> Result - where - I: Serialize; - /// The decoding method - fn decode(input: Self::Compact) -> Result - where - O: for<'de> Deserialize<'de>; -} - /// Sleep utilities #[cfg(feature = "sleep")] pub async fn sleep(duration: std::time::Duration) { @@ -162,11 +124,11 @@ pub mod interval { #[cfg(feature = "test-utils")] /// Test utilities that allows you to test backends pub mod test_utils { + use crate::backend::Backend; use crate::error::BoxDynError; use crate::request::Request; use crate::task::task_id::TaskId; use crate::worker::{Worker, WorkerId}; - use crate::Backend; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::future::BoxFuture; use futures::stream::{Stream, StreamExt}; diff --git a/packages/apalis-core/src/memory.rs b/packages/apalis-core/src/memory.rs index 7234b19..7caeb1c 100644 --- a/packages/apalis-core/src/memory.rs +++ b/packages/apalis-core/src/memory.rs @@ -1,9 +1,10 @@ use crate::{ + backend::Backend, mq::MessageQueue, + poller::Poller, poller::{controller::Controller, stream::BackendStream}, request::{Request, RequestStream}, worker::{self, Worker}, - Backend, Poller, }; use futures::{ channel::mpsc::{channel, Receiver, Sender}, diff --git a/packages/apalis-core/src/monitor/mod.rs b/packages/apalis-core/src/monitor/mod.rs index 4f164ee..895337f 100644 --- a/packages/apalis-core/src/monitor/mod.rs +++ b/packages/apalis-core/src/monitor/mod.rs @@ -11,10 +11,10 @@ use tower::{Layer, Service}; pub mod shutdown; use crate::{ + backend::Backend, error::BoxDynError, request::Request, worker::{Context, Event, EventHandler, Ready, Worker, WorkerId}, - Backend, }; use self::shutdown::Shutdown; diff --git a/packages/apalis-core/src/request.rs b/packages/apalis-core/src/request.rs index f1167d6..21d04ef 100644 --- a/packages/apalis-core/src/request.rs +++ b/packages/apalis-core/src/request.rs @@ -2,15 +2,15 @@ use futures::{future::BoxFuture, Stream}; use serde::{Deserialize, Serialize}; use tower::layer::util::Identity; -use std::{fmt::Debug, pin::Pin}; +use std::{fmt, fmt::Debug, pin::Pin, str::FromStr}; use crate::{ + backend::Backend, data::Extensions, error::Error, poller::Poller, task::{attempt::Attempt, namespace::Namespace, task_id::TaskId}, worker::{Context, Worker}, - Backend, }; /// Represents a job which can be serialized and executed @@ -43,6 +43,7 @@ pub struct Parts { /// Represents the namespace #[serde(skip)] pub namespace: Option, + //TODO: add State } impl Request { @@ -98,6 +99,59 @@ impl std::ops::DerefMut for Request { } } +/// Represents the state of a job/task +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, std::cmp::Eq)] +pub enum State { + /// Job is pending + #[serde(alias = "Latest")] + Pending, + /// Job is in the queue but not ready for execution + Scheduled, + /// Job is running + Running, + /// Job was done successfully + Done, + /// Job has failed. Check `last_error` + Failed, + /// Job has been killed + Killed, +} + +impl Default for State { + fn default() -> Self { + State::Pending + } +} + +impl FromStr for State { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "Pending" | "Latest" => Ok(State::Pending), + "Running" => Ok(State::Running), + "Done" => Ok(State::Done), + "Failed" => Ok(State::Failed), + "Killed" => Ok(State::Killed), + "Scheduled" => Ok(State::Scheduled), + _ => Err(Error::MissingData("Invalid Job state".to_string())), + } + } +} + +impl fmt::Display for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self { + State::Pending => write!(f, "Pending"), + State::Running => write!(f, "Running"), + State::Done => write!(f, "Done"), + State::Failed => write!(f, "Failed"), + State::Killed => write!(f, "Killed"), + State::Scheduled => write!(f, "Scheduled"), + } + } +} + /// Represents a stream that is send pub type BoxStream<'a, T> = Pin + Send + 'a>>; diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index d3fb364..31b527f 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -1,10 +1,10 @@ +use crate::backend::Backend; use crate::error::{BoxDynError, Error}; use crate::layers::extensions::Data; use crate::monitor::shutdown::Shutdown; use crate::request::Request; use crate::service_fn::FromRequest; use crate::task::task_id::TaskId; -use crate::Backend; use futures::future::{join, select, BoxFuture}; use futures::stream::BoxStream; use futures::{Future, FutureExt, Stream, StreamExt}; @@ -160,7 +160,7 @@ impl Ready { } /// Represents a generic [Worker] that can be in many different states -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct Worker { pub(crate) id: WorkerId, pub(crate) state: T, diff --git a/packages/apalis-cron/src/lib.rs b/packages/apalis-cron/src/lib.rs index 849f118..4db6d62 100644 --- a/packages/apalis-cron/src/lib.rs +++ b/packages/apalis-cron/src/lib.rs @@ -57,6 +57,7 @@ //! } //! ``` +use apalis_core::backend::Backend; use apalis_core::error::BoxDynError; use apalis_core::layers::Identity; use apalis_core::mq::MessageQueue; @@ -65,7 +66,6 @@ use apalis_core::request::RequestStream; use apalis_core::storage::Storage; use apalis_core::task::namespace::Namespace; use apalis_core::worker::{Context, Worker}; -use apalis_core::Backend; use apalis_core::{error::Error, request::Request}; use chrono::{DateTime, TimeZone, Utc}; pub use cron::Schedule; diff --git a/packages/apalis-cron/src/pipe.rs b/packages/apalis-cron/src/pipe.rs index be5ada4..bad125a 100644 --- a/packages/apalis-cron/src/pipe.rs +++ b/packages/apalis-cron/src/pipe.rs @@ -1,6 +1,6 @@ +use apalis_core::backend::Backend; use apalis_core::error::BoxDynError; use apalis_core::request::BoxStream; -use apalis_core::Backend; use apalis_core::{poller::Poller, request::Request, worker::Context, worker::Worker}; use futures::StreamExt; use std::{error, fmt}; diff --git a/packages/apalis-redis/src/expose.rs b/packages/apalis-redis/src/expose.rs new file mode 100644 index 0000000..4e4e426 --- /dev/null +++ b/packages/apalis-redis/src/expose.rs @@ -0,0 +1,258 @@ +use crate::RedisContext; +use crate::RedisStorage; +use apalis_core::backend::BackendExpose; +use apalis_core::backend::Stat; +use apalis_core::backend::WorkerState; +use apalis_core::codec::json::JsonCodec; +use apalis_core::codec::Codec; +use apalis_core::request::Request; +use apalis_core::request::State; +use apalis_core::worker::Worker; +use apalis_core::worker::WorkerId; +use redis::{ErrorKind, Value}; +use serde::{de::DeserializeOwned, Serialize}; + +type RedisCodec = JsonCodec>; + +impl BackendExpose for RedisStorage +where + T: 'static + Serialize + DeserializeOwned + Send + Unpin + Sync, +{ + type Request = Request; + type Error = redis::RedisError; + async fn stats(&self) -> Result { + let mut conn = self.get_connection().clone(); + let queue = self.get_config(); + let script = r#" + local pending_jobs_set = KEYS[1] + local running_jobs_set = KEYS[2] + local dead_jobs_set = KEYS[3] + local failed_jobs_set = KEYS[4] + local success_jobs_set = KEYS[5] + + local pending_count = redis.call('ZCARD', pending_jobs_set) + local running_count = redis.call('ZCARD', running_jobs_set) + local dead_count = redis.call('ZCARD', dead_jobs_set) + local failed_count = redis.call('ZCARD', failed_jobs_set) + local success_count = redis.call('ZCARD', success_jobs_set) + + return {pending_count, running_count, dead_count, failed_count, success_count} + "#; + + let keys = vec![ + queue.inflight_jobs_set().to_string(), + queue.active_jobs_list().to_string(), + queue.dead_jobs_set().to_string(), + queue.failed_jobs_set().to_string(), + queue.done_jobs_set().to_string(), + ]; + + let results: Vec = redis::cmd("EVAL") + .arg(script) + .arg(keys.len().to_string()) + .arg(keys) + .query_async(&mut conn) + .await?; + + Ok(Stat { + pending: results[0], + running: results[1], + dead: results[2], + failed: results[3], + success: results[4], + }) + } + async fn list_jobs( + &self, + status: &State, + page: i32, + ) -> Result, redis::RedisError> { + let mut conn = self.get_connection().clone(); + let queue = self.get_config(); + match status { + State::Pending | State::Scheduled => { + let active_jobs_list = &queue.active_jobs_list(); + let job_data_hash = &queue.job_data_hash(); + let ids: Vec = redis::cmd("LRANGE") + .arg(active_jobs_list) + .arg(((page - 1) * 10).to_string()) + .arg((page * 10).to_string()) + .query_async(&mut conn) + .await?; + + if ids.is_empty() { + return Ok(Vec::new()); + } + let data: Option = redis::cmd("HMGET") + .arg(job_data_hash) + .arg(&ids) + .query_async(&mut conn) + .await?; + + let jobs: Vec> = + deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap(); + Ok(jobs) + } + State::Running => { + let consumers_set = &queue.consumers_set(); + let job_data_hash = &queue.job_data_hash(); + let workers: Vec = redis::cmd("ZRANGE") + .arg(consumers_set) + .arg("0") + .arg("-1") + .query_async(&mut conn) + .await?; + + if workers.is_empty() { + return Ok(Vec::new()); + } + let mut all_jobs = Vec::new(); + for worker in workers { + let ids: Vec = redis::cmd("SMEMBERS") + .arg(&worker) + .query_async(&mut conn) + .await?; + + if ids.is_empty() { + continue; + }; + let data: Option = redis::cmd("HMGET") + .arg(job_data_hash.clone()) + .arg(&ids) + .query_async(&mut conn) + .await?; + + let jobs: Vec> = + deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap(); + all_jobs.extend(jobs); + } + + Ok(all_jobs) + } + State::Done => { + let done_jobs_set = &queue.done_jobs_set(); + let job_data_hash = &queue.job_data_hash(); + let ids: Vec = redis::cmd("ZRANGE") + .arg(done_jobs_set) + .arg(((page - 1) * 10).to_string()) + .arg((page * 10).to_string()) + .query_async(&mut conn) + .await?; + + if ids.is_empty() { + return Ok(Vec::new()); + } + let data: Option = redis::cmd("HMGET") + .arg(job_data_hash) + .arg(&ids) + .query_async(&mut conn) + .await?; + + let jobs: Vec> = + deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap(); + Ok(jobs) + } + // State::Retry => Ok(Vec::new()), + State::Failed => { + let failed_jobs_set = &queue.failed_jobs_set(); + let job_data_hash = &queue.job_data_hash(); + let ids: Vec = redis::cmd("ZRANGE") + .arg(failed_jobs_set) + .arg(((page - 1) * 10).to_string()) + .arg((page * 10).to_string()) + .query_async(&mut conn) + .await?; + if ids.is_empty() { + return Ok(Vec::new()); + } + let data: Option = redis::cmd("HMGET") + .arg(job_data_hash) + .arg(&ids) + .query_async(&mut conn) + .await?; + let jobs: Vec> = + deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap(); + + Ok(jobs) + } + State::Killed => { + let dead_jobs_set = &queue.dead_jobs_set(); + let job_data_hash = &queue.job_data_hash(); + let ids: Vec = redis::cmd("ZRANGE") + .arg(dead_jobs_set) + .arg(((page - 1) * 10).to_string()) + .arg((page * 10).to_string()) + .query_async(&mut conn) + .await?; + + if ids.is_empty() { + return Ok(Vec::new()); + } + let data: Option = redis::cmd("HMGET") + .arg(job_data_hash) + .arg(&ids) + .query_async(&mut conn) + .await?; + + let jobs: Vec> = + deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap(); + + Ok(jobs) + } + } + } + async fn list_workers(&self) -> Result>, redis::RedisError> { + let queue = self.get_config(); + let consumers_set = &queue.consumers_set(); + let mut conn = self.get_connection().clone(); + let workers: Vec = redis::cmd("ZRANGE") + .arg(consumers_set) + .arg("0") + .arg("-1") + .query_async(&mut conn) + .await?; + Ok(workers + .into_iter() + .map(|w| { + Worker::new( + WorkerId::new(w.replace(&format!("{}:", &queue.inflight_jobs_set()), "")), + WorkerState::new::(queue.get_namespace().to_owned()), + ) + }) + .collect()) + } +} + +fn deserialize_multiple_jobs>>( + jobs: Option<&Value>, +) -> Option>> +where + T: DeserializeOwned, +{ + let jobs = match jobs { + None => None, + Some(Value::Array(val)) => Some(val), + _ => { + // error!( + // "Decoding Message Failed: {:?}", + // "unknown result type for next message" + // ); + None + } + }; + + jobs.map(|values| { + values + .iter() + .filter_map(|v| match v { + Value::BulkString(data) => { + let inner = C::decode(data.to_vec()) + .map_err(|e| (ErrorKind::IoError, "Decode error", e.into().to_string())) + .unwrap(); + Some(inner) + } + _ => None, + }) + .collect() + }) +} diff --git a/packages/apalis-redis/src/lib.rs b/packages/apalis-redis/src/lib.rs index 764e4ab..fb91bf2 100644 --- a/packages/apalis-redis/src/lib.rs +++ b/packages/apalis-redis/src/lib.rs @@ -27,6 +27,7 @@ //! } //! ``` +mod expose; mod storage; pub use storage::connect; pub use storage::Config; diff --git a/packages/apalis-redis/src/storage.rs b/packages/apalis-redis/src/storage.rs index a2609d3..e335c21 100644 --- a/packages/apalis-redis/src/storage.rs +++ b/packages/apalis-redis/src/storage.rs @@ -11,7 +11,7 @@ use apalis_core::storage::Storage; use apalis_core::task::namespace::Namespace; use apalis_core::task::task_id::TaskId; use apalis_core::worker::{Event, Worker, WorkerId}; -use apalis_core::{Backend, Codec}; +use apalis_core::{backend::Backend, codec::Codec}; use chrono::{DateTime, Utc}; use futures::channel::mpsc::{self, SendError, Sender}; use futures::{select, FutureExt, SinkExt, StreamExt, TryFutureExt}; diff --git a/packages/apalis-sql/src/context.rs b/packages/apalis-sql/src/context.rs index 84900ce..40a37d0 100644 --- a/packages/apalis-sql/src/context.rs +++ b/packages/apalis-sql/src/context.rs @@ -1,10 +1,9 @@ -use apalis_core::error::Error; use apalis_core::request::Request; use apalis_core::service_fn::FromRequest; use apalis_core::worker::WorkerId; +use apalis_core::{error::Error, request::State}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use std::{fmt, str::FromStr}; /// The context for a job is represented here /// Used to provide a context for a job with an sql backend @@ -115,52 +114,3 @@ impl FromRequest> for SqlContext { Ok(req.parts.context.clone()) } } - -/// Represents the state of a job -#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, std::cmp::Eq)] -pub enum State { - /// Job is pending - #[serde(alias = "Latest")] - Pending, - /// Job is running - Running, - /// Job was done successfully - Done, - /// Job has failed. Check `last_error` - Failed, - /// Job has been killed - Killed, -} - -impl Default for State { - fn default() -> Self { - State::Pending - } -} - -impl FromStr for State { - type Err = Error; - - fn from_str(s: &str) -> Result { - match s { - "Pending" | "Latest" => Ok(State::Pending), - "Running" => Ok(State::Running), - "Done" => Ok(State::Done), - "Failed" => Ok(State::Failed), - "Killed" => Ok(State::Killed), - _ => Err(Error::MissingData("Invalid Job state".to_string())), - } - } -} - -impl fmt::Display for State { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self { - State::Pending => write!(f, "Pending"), - State::Running => write!(f, "Running"), - State::Done => write!(f, "Done"), - State::Failed => write!(f, "Failed"), - State::Killed => write!(f, "Killed"), - } - } -} diff --git a/packages/apalis-sql/src/lib.rs b/packages/apalis-sql/src/lib.rs index 0a9c6ab..4d3a1e0 100644 --- a/packages/apalis-sql/src/lib.rs +++ b/packages/apalis-sql/src/lib.rs @@ -10,10 +10,9 @@ //! apalis offers Sqlite, Mysql and Postgres storages for its workers. //! See relevant modules for examples -use std::time::Duration; +use std::{num::TryFromIntError, time::Duration}; -use apalis_core::error::Error; -use context::State; +use apalis_core::{error::Error, request::State}; /// The context of the sql job pub mod context; @@ -49,6 +48,17 @@ pub struct Config { namespace: String, } +/// A general sql error +#[derive(Debug, thiserror::Error)] +pub enum SqlError { + /// Handles sqlx errors + #[error("sqlx::Error: {0}")] + Sqlx(#[from] sqlx::Error), + /// Handles int conversion errors + #[error("TryFromIntError: {0}")] + TryFromInt(#[from] TryFromIntError), +} + impl Default for Config { fn default() -> Self { Self { diff --git a/packages/apalis-sql/src/mysql.rs b/packages/apalis-sql/src/mysql.rs index 5b235fc..85c713c 100644 --- a/packages/apalis-sql/src/mysql.rs +++ b/packages/apalis-sql/src/mysql.rs @@ -1,3 +1,4 @@ +use apalis_core::backend::{BackendExpose, Stat, WorkerState}; use apalis_core::codec::json::JsonCodec; use apalis_core::error::{BoxDynError, Error}; use apalis_core::layers::{Ack, AckLayer}; @@ -5,13 +6,13 @@ use apalis_core::notify::Notify; use apalis_core::poller::controller::Controller; use apalis_core::poller::stream::BackendStream; use apalis_core::poller::Poller; -use apalis_core::request::{Parts, Request, RequestStream}; +use apalis_core::request::{Parts, Request, RequestStream, State}; use apalis_core::response::Response; use apalis_core::storage::Storage; use apalis_core::task::namespace::Namespace; use apalis_core::task::task_id::TaskId; use apalis_core::worker::{Context, Event, Worker, WorkerId}; -use apalis_core::{Backend, Codec}; +use apalis_core::{backend::Backend, codec::Codec}; use async_stream::try_stream; use chrono::{DateTime, Utc}; use futures::{Stream, StreamExt, TryStreamExt}; @@ -29,10 +30,12 @@ use std::{marker::PhantomData, ops::Add, time::Duration}; use crate::context::SqlContext; use crate::from_row::SqlRequest; -use crate::{calculate_status, Config}; +use crate::{calculate_status, Config, SqlError}; pub use sqlx::mysql::MySqlPool; +type MysqlCodec = JsonCodec; + /// Represents a [Storage] that persists to MySQL pub struct MysqlStorage> @@ -573,10 +576,76 @@ impl MysqlStorage { } } +impl BackendExpose + for MysqlStorage +{ + type Request = Request>; + type Error = SqlError; + async fn stats(&self) -> Result { + let fetch_query = "SELECT + COUNT(CASE WHEN status = 'Pending' THEN 1 END) AS pending, + COUNT(CASE WHEN status = 'Running' THEN 1 END) AS running, + COUNT(CASE WHEN status = 'Done' THEN 1 END) AS done, + COUNT(CASE WHEN status = 'Retry' THEN 1 END) AS retry, + COUNT(CASE WHEN status = 'Failed' THEN 1 END) AS failed, + COUNT(CASE WHEN status = 'Killed' THEN 1 END) AS killed + FROM jobs WHERE job_type = ?"; + + let res: (i64, i64, i64, i64, i64, i64) = sqlx::query_as(fetch_query) + .bind(self.get_config().namespace()) + .fetch_one(self.pool()) + .await?; + + Ok(Stat { + pending: res.0.try_into()?, + running: res.1.try_into()?, + dead: res.4.try_into()?, + failed: res.3.try_into()?, + success: res.2.try_into()?, + }) + } + + async fn list_jobs( + &self, + status: &State, + page: i32, + ) -> Result, Self::Error> { + let status = status.to_string(); + let fetch_query = "SELECT * FROM jobs WHERE status = ? AND job_type = ? ORDER BY done_at DESC, run_at DESC LIMIT 10 OFFSET ?"; + let res: Vec> = sqlx::query_as(fetch_query) + .bind(status) + .bind(self.get_config().namespace()) + .bind(((page - 1) * 10).to_string()) + .fetch_all(self.pool()) + .await?; + Ok(res + .into_iter() + .map(|j| { + let (req, ctx) = j.req.take_parts(); + let req: J = MysqlCodec::decode(req).unwrap(); + Request::new_with_ctx(req, ctx) + }) + .collect()) + } + + async fn list_workers(&self) -> Result>, Self::Error> { + let fetch_query = + "SELECT id, layers, last_seen FROM workers WHERE worker_type = ? ORDER BY last_seen DESC LIMIT 20 OFFSET ?"; + let res: Vec<(String, String, i64)> = sqlx::query_as(fetch_query) + .bind(self.get_config().namespace()) + .bind(0) + .fetch_all(self.pool()) + .await?; + Ok(res + .into_iter() + .map(|w| Worker::new(WorkerId::new(w.0), WorkerState::new::(w.1))) + .collect()) + } +} + #[cfg(test)] mod tests { - use crate::context::State; use crate::sql_storage_tests; use super::*; diff --git a/packages/apalis-sql/src/postgres.rs b/packages/apalis-sql/src/postgres.rs index 5ba1fb5..26664f2 100644 --- a/packages/apalis-sql/src/postgres.rs +++ b/packages/apalis-sql/src/postgres.rs @@ -39,7 +39,8 @@ //! } //! ``` use crate::context::SqlContext; -use crate::{calculate_status, Config}; +use crate::{calculate_status, Config, SqlError}; +use apalis_core::backend::{BackendExpose, Stat, WorkerState}; use apalis_core::codec::json::JsonCodec; use apalis_core::error::{BoxDynError, Error}; use apalis_core::layers::{Ack, AckLayer}; @@ -47,13 +48,13 @@ use apalis_core::notify::Notify; use apalis_core::poller::controller::Controller; use apalis_core::poller::stream::BackendStream; use apalis_core::poller::Poller; -use apalis_core::request::{Parts, Request, RequestStream}; +use apalis_core::request::{Parts, Request, RequestStream, State}; use apalis_core::response::Response; use apalis_core::storage::Storage; use apalis_core::task::namespace::Namespace; use apalis_core::task::task_id::TaskId; use apalis_core::worker::{Context, Event, Worker, WorkerId}; -use apalis_core::{Backend, Codec}; +use apalis_core::{backend::Backend, codec::Codec}; use chrono::{DateTime, Utc}; use futures::channel::mpsc; use futures::StreamExt; @@ -690,9 +691,77 @@ impl PostgresStorage { Ok(()) } } + +impl BackendExpose + for PostgresStorage +{ + type Request = Request>; + type Error = SqlError; + async fn stats(&self) -> Result { + let fetch_query = "SELECT + COUNT(1) FILTER (WHERE status = 'Pending') AS pending, + COUNT(1) FILTER (WHERE status = 'Running') AS running, + COUNT(1) FILTER (WHERE status = 'Done') AS done, + COUNT(1) FILTER (WHERE status = 'Retry') AS retry, + COUNT(1) FILTER (WHERE status = 'Failed') AS failed, + COUNT(1) FILTER (WHERE status = 'Killed') AS killed + FROM apalis.jobs WHERE job_type = $1"; + + let res: (i64, i64, i64, i64, i64, i64) = sqlx::query_as(fetch_query) + .bind(self.config().namespace()) + .fetch_one(self.pool()) + .await?; + + Ok(Stat { + pending: res.0.try_into()?, + running: res.1.try_into()?, + dead: res.4.try_into()?, + failed: res.3.try_into()?, + success: res.2.try_into()?, + }) + } + + async fn list_jobs( + &self, + status: &State, + page: i32, + ) -> Result, Self::Error> { + let status = status.to_string(); + let fetch_query = "SELECT * FROM apalis.jobs WHERE status = $1 AND job_type = $2 ORDER BY done_at DESC, run_at DESC LIMIT 10 OFFSET $3"; + let res: Vec> = sqlx::query_as(fetch_query) + .bind(status) + .bind(self.config().namespace()) + .bind(((page - 1) * 10).to_string()) + .fetch_all(self.pool()) + .await?; + Ok(res + .into_iter() + .map(|j| { + let (req, ctx) = j.req.take_parts(); + let req = JsonCodec::::decode(req).unwrap(); + Request::new_with_ctx(req, ctx) + }) + .collect()) + } + + async fn list_workers(&self) -> Result>, Self::Error> { + let fetch_query = + "SELECT id, layers, last_seen FROM apalis.workers WHERE worker_type = $1 ORDER BY last_seen DESC LIMIT 20 OFFSET $2"; + let res: Vec<(String, String, i64)> = sqlx::query_as(fetch_query) + .bind(self.config().namespace()) + .bind(0) + .fetch_all(self.pool()) + .await?; + Ok(res + .into_iter() + .map(|w| Worker::new(WorkerId::new(w.0), WorkerState::new::(w.1))) + .collect()) + } +} + #[cfg(test)] mod tests { - use crate::context::State; + use crate::sql_storage_tests; use super::*; diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index e692b65..f562cc4 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -1,18 +1,19 @@ use crate::context::SqlContext; -use crate::{calculate_status, Config}; +use crate::{calculate_status, Config, SqlError}; +use apalis_core::backend::{BackendExpose, Stat, WorkerState}; use apalis_core::codec::json::JsonCodec; use apalis_core::error::Error; use apalis_core::layers::{Ack, AckLayer}; use apalis_core::poller::controller::Controller; use apalis_core::poller::stream::BackendStream; use apalis_core::poller::Poller; -use apalis_core::request::{Parts, Request, RequestStream}; +use apalis_core::request::{Parts, Request, RequestStream, State}; use apalis_core::response::Response; use apalis_core::storage::Storage; use apalis_core::task::namespace::Namespace; use apalis_core::task::task_id::TaskId; use apalis_core::worker::{Context, Event, Worker, WorkerId}; -use apalis_core::{Backend, Codec}; +use apalis_core::{backend::Backend, codec::Codec}; use async_stream::try_stream; use chrono::{DateTime, Utc}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; @@ -543,13 +544,79 @@ impl Ack for SqliteStorage { } } +impl BackendExpose + for SqliteStorage> +{ + type Request = Request>; + type Error = SqlError; + async fn stats(&self) -> Result { + let fetch_query = "SELECT + COUNT(1) FILTER (WHERE status = 'Pending') AS pending, + COUNT(1) FILTER (WHERE status = 'Running') AS running, + COUNT(1) FILTER (WHERE status = 'Done') AS done, + COUNT(1) FILTER (WHERE status = 'Failed') AS failed, + COUNT(1) FILTER (WHERE status = 'Killed') AS killed + FROM Jobs WHERE job_type = ?"; + + let res: (i64, i64, i64, i64, i64, i64) = sqlx::query_as(fetch_query) + .bind(self.get_config().namespace()) + .fetch_one(self.pool()) + .await?; + + Ok(Stat { + pending: res.0.try_into()?, + running: res.1.try_into()?, + dead: res.4.try_into()?, + failed: res.3.try_into()?, + success: res.2.try_into()?, + }) + } + + async fn list_jobs( + &self, + status: &State, + page: i32, + ) -> Result, Self::Error> { + let status = status.to_string(); + let fetch_query = "SELECT * FROM Jobs WHERE status = ? AND job_type = ? ORDER BY done_at DESC, run_at DESC LIMIT 10 OFFSET ?"; + let res: Vec> = sqlx::query_as(fetch_query) + .bind(status) + .bind(self.get_config().namespace()) + .bind(((page - 1) * 10).to_string()) + .fetch_all(self.pool()) + .await?; + Ok(res + .into_iter() + .map(|j| { + let (req, ctx) = j.req.take_parts(); + let req = JsonCodec::::decode(req).unwrap(); + Request::new_with_ctx(req, ctx) + }) + .collect()) + } + + async fn list_workers(&self) -> Result>, Self::Error> { + let fetch_query = + "SELECT id, layers, last_seen FROM Workers WHERE worker_type = ? ORDER BY last_seen DESC LIMIT 20 OFFSET ?"; + let res: Vec<(String, String, i64)> = sqlx::query_as(fetch_query) + .bind(self.get_config().namespace()) + .bind(0) + .fetch_all(self.pool()) + .await?; + Ok(res + .into_iter() + .map(|w| Worker::new(WorkerId::new(w.0), WorkerState::new::(w.1))) + .collect()) + } +} + #[cfg(test)] mod tests { - use crate::context::State; use crate::sql_storage_tests; use super::*; + use apalis_core::request::State; use apalis_core::test_utils::DummyService; use chrono::Utc; use email_service::example_good_email; diff --git a/src/lib.rs b/src/lib.rs index 2e3a3e8..f4bcf58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,7 +70,12 @@ pub mod layers; pub mod prelude { pub use crate::layers::WorkerBuilderExt; pub use apalis_core::{ + backend::Backend, + backend::BackendExpose, + backend::Stat, + backend::WorkerState, builder::{WorkerBuilder, WorkerFactory, WorkerFactoryFn}, + codec::Codec, data::Extensions, error::{BoxDynError, Error}, layers::extensions::{AddExtension, Data}, @@ -80,6 +85,7 @@ pub mod prelude { notify::Notify, poller::stream::BackendStream, poller::{controller::Controller, Poller}, + request::State, request::{Request, RequestStream}, response::IntoResponse, service_fn::{service_fn, FromRequest, ServiceFn}, @@ -87,6 +93,5 @@ pub mod prelude { task::attempt::Attempt, task::task_id::TaskId, worker::{Context, Event, Ready, Worker, WorkerError, WorkerId}, - Backend, Codec, }; }