Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Introduce simple ability to pipe cron jobs to any backend #455

Merged
merged 2 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ members = [
"examples/catch-panic",
"examples/graceful-shutdown",
"examples/unmonitored-worker",
"examples/fn-args",
"examples/fn-args", "examples/persisted-cron",
]


Expand Down
26 changes: 26 additions & 0 deletions examples/persisted-cron/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "persisted-cron"
version = "0.1.0"
edition.workspace = true
repository.workspace = true

[dependencies]
anyhow = "1"
apalis = { path = "../../", default-features = false, features = [
"tokio-comp",
"tracing",
"limit",
"catch-panic"
] }
apalis-cron = { path = "../../packages/apalis-cron" }
apalis-sql = { path = "../../packages/apalis-sql", features = ["sqlite", "tokio-comp"] }
tokio = { version = "1", features = ["full"] }
serde = "1"
tracing-subscriber = "0.3.11"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
pin-project-lite = "0.2.9"
tower = { version = "0.4", features = ["load-shed"] }

[dependencies.tracing]
default-features = false
version = "0.1"
57 changes: 57 additions & 0 deletions examples/persisted-cron/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use apalis::prelude::*;

use apalis_cron::CronStream;
use apalis_cron::Schedule;
use apalis_sql::sqlite::SqliteStorage;
use apalis_sql::sqlx::SqlitePool;
use chrono::{DateTime, Utc};
use serde::Deserialize;
use serde::Serialize;
use std::str::FromStr;
use std::time::Duration;

#[derive(Clone)]
struct FakeService;
impl FakeService {
fn execute(&self, item: Reminder) {
dbg!(&item.0);
}
}

#[derive(Default, Debug, Clone, Serialize, Deserialize)]
struct Reminder(DateTime<Utc>);
impl From<DateTime<Utc>> for Reminder {
fn from(t: DateTime<Utc>) -> Self {
Reminder(t)
}
}
async fn send_reminder(job: Reminder, svc: Data<FakeService>) {
svc.execute(job);
}

#[tokio::main]
async fn main() {
std::env::set_var("RUST_LOG", "debug,sqlx::query=error");
tracing_subscriber::fmt::init();

// We create our cron jobs stream
let schedule = Schedule::from_str("1/1 * * * * *").unwrap();
let cron_stream = CronStream::new(schedule);

// Lets create a storage for our cron jobs
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
SqliteStorage::setup(&pool)
.await
.expect("unable to run migrations for sqlite");
let sqlite = SqliteStorage::new(pool);

let backend = cron_stream.pipe_to_storage(sqlite);

let worker = WorkerBuilder::new("morning-cereal")
.enable_tracing()
.rate_limit(1, Duration::from_secs(2))
.data(FakeService)
.backend(backend)
.build_fn(send_reminder);
Monitor::new().register(worker).run().await.unwrap();
}
1 change: 1 addition & 0 deletions packages/apalis-core/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl<T: Send + 'static + Sync, Res> Backend<Request<T, ()>, Res> for MemoryStora
stream: BackendStream::new(stream, self.controller),
heartbeat: Box::pin(futures::future::pending()),
layer: Identity::new(),
_priv: (),
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions packages/apalis-core/src/poller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ pub mod stream;

/// A poller type that allows fetching from a stream and a heartbeat future that can be used to do periodic tasks
pub struct Poller<S, L = Identity> {
pub(crate) stream: S,
pub(crate) heartbeat: BoxFuture<'static, ()>,
pub(crate) layer: L,
/// The stream of jobs
pub stream: S,
/// The heartbeat for the backend
pub heartbeat: BoxFuture<'static, ()>,
/// The tower middleware provided by the backend
pub layer: L,
pub(crate) _priv: (),
}

impl<S> Poller<S, Identity> {
/// Build a new poller
pub fn new(stream: S, heartbeat: impl Future<Output = ()> + Send + 'static) -> Self {
Self {
stream,
heartbeat: heartbeat.boxed(),
layer: Identity::new(),
}
Self::new_with_layer(stream, heartbeat, Identity::new())
}

/// Build a poller with layer
Expand All @@ -34,6 +34,7 @@ impl<S> Poller<S, Identity> {
stream,
heartbeat: heartbeat.boxed(),
layer,
_priv: (),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/apalis-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl<T, Res, Ctx> Backend<Request<T, Ctx>, Res> for RequestStream<Request<T, Ctx
stream: self,
heartbeat: Box::pin(futures::future::pending()),
layer: Identity::new(),
_priv: (),
}
}
}
68 changes: 68 additions & 0 deletions packages/apalis-cron/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,26 @@
//! }
//! ```

use apalis_core::error::BoxDynError;
use apalis_core::layers::Identity;
use apalis_core::mq::MessageQueue;
use apalis_core::poller::Poller;
use apalis_core::request::RequestStream;
use apalis_core::storage::Storage;
use apalis_core::task::namespace::Namespace;
use apalis_core::worker::{Context, Worker};
use apalis_core::Backend;
use apalis_core::{error::Error, request::Request};
use chrono::{DateTime, TimeZone, Utc};
pub use cron::Schedule;
use futures::StreamExt;
use pipe::CronPipe;
use std::marker::PhantomData;
use std::sync::Arc;

/// Allows piping of cronjobs to a Storage or MessageQueue
pub mod pipe;

/// Represents a stream from a cron schedule with a timezone
#[derive(Clone, Debug)]
pub struct CronStream<J, Tz> {
Expand Down Expand Up @@ -133,6 +141,66 @@ where
};
Box::pin(stream)
}

/// Push cron job events to a storage and get a consumable Backend
pub fn pipe_to_storage<S, Ctx>(self, storage: S) -> CronPipe<S>
where
S: Storage<Job = Req, Context = Ctx> + Clone + Send + Sync + 'static,
S::Error: std::error::Error + Send + Sync + 'static,
{
let stream = self
.into_stream()
.then({
let storage = storage.clone();
move |res| {
let mut storage = storage.clone();
async move {
match res {
Ok(Some(req)) => storage
.push(req.args)
.await
.map(|_| ())
.map_err(|e| Box::new(e) as BoxDynError),
_ => Ok(()),
}
}
}
})
.boxed();

CronPipe {
stream,
inner: storage,
}
}
/// Push cron job events to a message queue and get a consumable Backend
pub fn pipe_to_mq<Mq>(self, mq: Mq) -> CronPipe<Mq>
where
Mq: MessageQueue<Req> + Clone + Send + Sync + 'static,
Mq::Error: std::error::Error + Send + Sync + 'static,
{
let stream = self
.into_stream()
.then({
let mq = mq.clone();
move |res| {
let mut mq = mq.clone();
async move {
match res {
Ok(Some(req)) => mq
.enqueue(req.args)
.await
.map(|_| ())
.map_err(|e| Box::new(e) as BoxDynError),
_ => Ok(()),
}
}
}
})
.boxed();

CronPipe { stream, inner: mq }
}
}

impl<Req, Tz, Res> Backend<Request<Req, ()>, Res> for CronStream<Req, Tz>
Expand Down
77 changes: 77 additions & 0 deletions packages/apalis-cron/src/pipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use apalis_core::error::BoxDynError;
use apalis_core::request::BoxStream;
use apalis_core::Backend;
use apalis_core::{poller::Poller, request::Request, worker::Context, worker::Worker};
use futures::StreamExt;
use std::{error, fmt};
use tower::Service;

/// A generic Pipe that wraps an inner type along with a `RequestStream`.
pub struct CronPipe<Inner> {
pub(crate) stream: BoxStream<'static, Result<(), BoxDynError>>,
pub(crate) inner: Inner,
}

impl<Inner: fmt::Debug> fmt::Debug for CronPipe<Inner> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Pipe")
.field("stream", &"<RequestStream<()>>") // Placeholder as `RequestStream` might not implement Debug
.field("inner", &self.inner)
.finish()
}
}

impl<T, Res, Ctx, Inner> Backend<Request<T, Ctx>, Res> for CronPipe<Inner>
where
Inner: Backend<Request<T, Ctx>, Res>,
{
type Stream = Inner::Stream;

type Layer = Inner::Layer;

fn poll<Svc: Service<Request<T, Ctx>, Response = Res>>(
mut self,
worker: &Worker<Context>,
) -> Poller<Self::Stream, Self::Layer> {
let pipe_heartbeat = async move { while (self.stream.next().await).is_some() {} };
let inner = self.inner.poll::<Svc>(worker);
let heartbeat = inner.heartbeat;

Poller::new_with_layer(
inner.stream,
async {
futures::join!(heartbeat, pipe_heartbeat);
},
inner.layer,
)
}
}

/// A cron error
#[derive(Debug)]
pub struct PipeError {
kind: PipeErrorKind,
}

/// The kind of pipe error that occurred
#[derive(Debug)]
pub enum PipeErrorKind {
/// The cron stream provided a None
EmptyStream,
}

impl fmt::Display for PipeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.kind {
PipeErrorKind::EmptyStream => write!(f, "The cron stream provided a None",),
}
}
}

impl error::Error for PipeError {}

impl From<PipeErrorKind> for PipeError {
fn from(kind: PipeErrorKind) -> PipeError {
PipeError { kind }
}
}
Loading