Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return of exposing backends to help in building apis #457

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ members = [
"examples/catch-panic",
"examples/graceful-shutdown",
"examples/unmonitored-worker",
"examples/fn-args", "examples/persisted-cron",
"examples/fn-args", "examples/persisted-cron", "examples/rest-api",
]


Expand Down
15 changes: 15 additions & 0 deletions examples/rest-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "rest-api"
version = "0.1.0"
edition.workspace = true
repository.workspace = true

[dependencies]
anyhow = "1"
apalis = { path = "../../" }
apalis-redis = { path = "../../packages/apalis-redis" }
serde = "1"
env_logger = "0.10"
actix-web = "4"
futures = "0.3"
email-service = { path = "../email-service" }
108 changes: 108 additions & 0 deletions examples/rest-api/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use actix_web::rt::signal;
use actix_web::{web, App, HttpResponse, HttpServer};
use anyhow::Result;
use apalis::prelude::*;

use apalis_redis::RedisStorage;
use futures::future;

use email_service::{send_email, Email};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Debug)]
struct Filter {
#[serde(default)]
pub status: State,
#[serde(default = "default_page")]
pub page: i32,
}

fn default_page() -> i32 {
1
}

#[derive(Debug, Serialize, Deserialize)]
struct GetJobsResult<T> {
pub stats: Stat,
pub jobs: Vec<T>,
}

async fn push_job(job: web::Json<Email>, storage: web::Data<RedisStorage<Email>>) -> HttpResponse {
let mut storage = (**storage).clone();
let res = storage.push(job.into_inner()).await;
match res {
Ok(parts) => {
HttpResponse::Ok().body(format!("Job with ID [{}] added to queue", parts.task_id))
}
Err(e) => HttpResponse::InternalServerError().json(e.to_string()),
}
}

async fn get_jobs(
storage: web::Data<RedisStorage<Email>>,
filter: web::Query<Filter>,
) -> HttpResponse {
let stats = storage.stats().await.unwrap_or_default();
let res = storage.list_jobs(&filter.status, filter.page).await;
match res {
Ok(jobs) => HttpResponse::Ok().json(GetJobsResult { stats, jobs }),
Err(e) => HttpResponse::InternalServerError().json(e.to_string()),
}
}

async fn get_workers(storage: web::Data<RedisStorage<Email>>) -> HttpResponse {
let workers = storage.list_workers().await;
match workers {
Ok(workers) => HttpResponse::Ok().json(workers),
Err(e) => HttpResponse::InternalServerError().json(e.to_string()),
}
}

async fn get_job(
job_id: web::Path<TaskId>,
storage: web::Data<RedisStorage<Email>>,
) -> HttpResponse {
let mut storage = (**storage).clone();

let res = storage.fetch_by_id(&job_id).await;
match res {
Ok(Some(job)) => HttpResponse::Ok().json(job),
Ok(None) => HttpResponse::NotFound().finish(),
Err(e) => HttpResponse::InternalServerError().json(e.to_string()),
}
}

#[actix_web::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();

let conn = apalis_redis::connect("redis://127.0.0.1/").await?;
let storage = RedisStorage::new(conn);
let data = web::Data::new(storage.clone());
let http = async {
HttpServer::new(move || {
App::new()
.app_data(data.clone())
.route("/", web::get().to(get_jobs)) // Fetch jobs in queue
.route("/workers", web::get().to(get_workers)) // Fetch workers
.route("/job", web::put().to(push_job)) // Allow add jobs via api
.route("/job/{job_id}", web::get().to(get_job)) // Allow fetch specific job
})
.bind("127.0.0.1:8000")?
.run()
.await?;
Ok(())
};
let worker = Monitor::new()
.register({
WorkerBuilder::new("tasty-avocado")
.enable_tracing()
.backend(storage)
.build_fn(send_email)
})
.run_with_signal(signal::ctrl_c());

future::try_join(http, worker).await?;
Ok(())
}
92 changes: 92 additions & 0 deletions packages/apalis-core/src/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::{any::type_name, future::Future};

use futures::Stream;
use serde::{Deserialize, Serialize};
use tower::Service;

use crate::{
poller::Poller,
request::State,
worker::{Context, Worker},
};

/// A backend represents a task source
/// Both [`Storage`] and [`MessageQueue`] need to implement it for workers to be able to consume tasks
///
/// [`Storage`]: crate::storage::Storage
/// [`MessageQueue`]: crate::mq::MessageQueue
pub trait Backend<Req, Res> {
/// The stream to be produced by the backend
type Stream: Stream<Item = Result<Option<Req>, crate::error::Error>>;

/// Returns the final decoration of layers
type Layer;

/// Returns a poller that is ready for streaming
fn poll<Svc: Service<Req, Response = Res>>(
self,
worker: &Worker<Context>,
) -> Poller<Self::Stream, Self::Layer>;
}

/// Represents functionality that allows reading of jobs and stats from a backend
/// Some backends esp MessageQueues may not currently implement this
pub trait BackendExpose<T>
where
Self: Sized,
{
/// The request type being handled by the backend
type Request;
/// The error returned during reading jobs and stats
type Error;
/// List all Workers that are working on a backend
fn list_workers(
&self,
) -> impl Future<Output = Result<Vec<Worker<WorkerState>>, Self::Error>> + Send;

/// Returns the counts of jobs in different states
fn stats(&self) -> impl Future<Output = Result<Stat, Self::Error>> + Send;

/// Fetch jobs persisted in a backend
fn list_jobs(
&self,
status: &State,
page: i32,
) -> impl Future<Output = Result<Vec<Self::Request>, Self::Error>> + Send;
}

/// Represents the current statistics of a backend
#[derive(Debug, Deserialize, Serialize, Default)]
pub struct Stat {
/// Represents pending tasks
pub pending: usize,
/// Represents running tasks
pub running: usize,
/// Represents dead tasks
pub dead: usize,
/// Represents failed tasks
pub failed: usize,
/// Represents successful tasks
pub success: usize,
}

/// A serializable version of a worker's state.
#[derive(Debug, Serialize, Deserialize)]
pub struct WorkerState {
/// Type of task being consumed by the worker, useful for display and filtering
pub r#type: String,
/// The type of job stream
pub source: String,
// TODO: // The layers that were loaded for worker.
// TODO: // pub layers: Vec<Layer>,
// TODO: // last_seen: Timestamp,
}
impl WorkerState {
/// Build a new state
pub fn new<S>(r#type: String) -> Self {
Self {
r#type,
source: type_name::<S>().to_string(),
}
}
}
2 changes: 1 addition & 1 deletion packages/apalis-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use tower::{
};

use crate::{
backend::Backend,
error::Error,
layers::extensions::Data,
request::Request,
service_fn::service_fn,
service_fn::ServiceFn,
worker::{Ready, Worker, WorkerId},
Backend,
};

/// Allows building a [`Worker`].
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/codec/json.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::marker::PhantomData;

use crate::Codec;
use crate::codec::Codec;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down
20 changes: 20 additions & 0 deletions packages/apalis-core/src/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
use serde::{Deserialize, Serialize};

use crate::error::BoxDynError;

/// A codec allows backends to encode and decode data
pub trait Codec {
/// The mode of storage by the codec
type Compact;
/// Error encountered by the codec
type Error: Into<BoxDynError>;
/// The encoding method
fn encode<I>(input: I) -> Result<Self::Compact, Self::Error>
where
I: Serialize;
/// The decoding method
fn decode<O>(input: Self::Compact) -> Result<O, Self::Error>
where
O: for<'de> Deserialize<'de>;
}

/// Encoding for tasks using json
#[cfg(feature = "json")]
pub mod json;
46 changes: 4 additions & 42 deletions packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,11 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
//! # apalis-core
//! Utilities for building job and message processing tools.
use error::BoxDynError;
use futures::Stream;
use poller::Poller;
use serde::{Deserialize, Serialize};
use tower::Service;
use worker::{Context, Worker};

/// Represent utilities for creating worker instances.
pub mod builder;

/// Represents a task source eg Postgres or Redis
pub mod backend;
/// Includes all possible error types.
pub mod error;
/// Represents middleware offered through [`tower`]
Expand Down Expand Up @@ -66,40 +62,6 @@ pub mod task;
/// Codec for handling data
pub mod codec;

/// A backend represents a task source
/// Both [`Storage`] and [`MessageQueue`] need to implement it for workers to be able to consume tasks
///
/// [`Storage`]: crate::storage::Storage
/// [`MessageQueue`]: crate::mq::MessageQueue
pub trait Backend<Req, Res> {
/// The stream to be produced by the backend
type Stream: Stream<Item = Result<Option<Req>, crate::error::Error>>;

/// Returns the final decoration of layers
type Layer;

/// Returns a poller that is ready for streaming
fn poll<Svc: Service<Req, Response = Res>>(
self,
worker: &Worker<Context>,
) -> Poller<Self::Stream, Self::Layer>;
}
/// A codec allows backends to encode and decode data
pub trait Codec {
/// The mode of storage by the codec
type Compact;
/// Error encountered by the codec
type Error: Into<BoxDynError>;
/// The encoding method
fn encode<I>(input: I) -> Result<Self::Compact, Self::Error>
where
I: Serialize;
/// The decoding method
fn decode<O>(input: Self::Compact) -> Result<O, Self::Error>
where
O: for<'de> Deserialize<'de>;
}

/// Sleep utilities
#[cfg(feature = "sleep")]
pub async fn sleep(duration: std::time::Duration) {
Expand Down Expand Up @@ -162,11 +124,11 @@ pub mod interval {
#[cfg(feature = "test-utils")]
/// Test utilities that allows you to test backends
pub mod test_utils {
use crate::backend::Backend;
use crate::error::BoxDynError;
use crate::request::Request;
use crate::task::task_id::TaskId;
use crate::worker::{Worker, WorkerId};
use crate::Backend;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::future::BoxFuture;
use futures::stream::{Stream, StreamExt};
Expand Down
3 changes: 2 additions & 1 deletion packages/apalis-core/src/memory.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::{
backend::Backend,
mq::MessageQueue,
poller::Poller,
poller::{controller::Controller, stream::BackendStream},
request::{Request, RequestStream},
worker::{self, Worker},
Backend, Poller,
};
use futures::{
channel::mpsc::{channel, Receiver, Sender},
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use tower::{Layer, Service};
pub mod shutdown;

use crate::{
backend::Backend,
error::BoxDynError,
request::Request,
worker::{Context, Event, EventHandler, Ready, Worker, WorkerId},
Backend,
};

use self::shutdown::Shutdown;
Expand Down
Loading
Loading