Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some refactoring in jobs module #283

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ tokio-postgres = { version = "0.7.12", features = [
"with-serde_json-1",
"with-time-0_3",
] }
tokio-util = { version = "0.7.12", features = ["rt"] }
tower = "0.5.1"
tower-http = { version = "0.6.2", features = ["auth", "fs", "set-header", "trace"] }
tracing = "0.1.40"
Expand Down
6 changes: 3 additions & 3 deletions clowarden-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use clowarden_core::{
},
};

/// Environment variable containing Github token.
const GITHUB_TOKEN: &str = "GITHUB_TOKEN";

#[derive(Parser)]
#[command(
version,
Expand Down Expand Up @@ -83,9 +86,6 @@ struct GenerateArgs {
output_file: PathBuf,
}

/// Environment variable containing Github token.
const GITHUB_TOKEN: &str = "GITHUB_TOKEN";

#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
Expand Down
4 changes: 2 additions & 2 deletions clowarden-core/src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This module defines some types and traits that service handlers
//! implementations will rely upon.

use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};

use anyhow::Result;
use as_any::AsAny;
Expand All @@ -27,7 +27,7 @@ pub trait ServiceHandler {
}

/// Type alias to represent a service handler trait object.
pub type DynServiceHandler = Box<dyn ServiceHandler + Send + Sync>;
pub type DynServiceHandler = Arc<dyn ServiceHandler + Send + Sync>;

/// Represents a summary of changes detected in the service's state as defined
/// in the configuration from the base to the head reference.
Expand Down
1 change: 1 addition & 0 deletions clowarden-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }
tokio-postgres = { workspace = true }
tokio-util = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions clowarden-server/src/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use thiserror::Error;

use clowarden_core::cfg::{GitHubApp, Organization};

/// Name used for the check run in GitHub.
const CHECK_RUN_NAME: &str = "CLOWarden";

/// Trait that defines some operations a GH implementation must support.
#[async_trait]
#[cfg_attr(test, automock)]
Expand Down Expand Up @@ -165,9 +168,6 @@ pub(crate) enum PullRequestEventAction {
Other,
}

/// Name used for the check run in GitHub.
const CHECK_RUN_NAME: &str = "CLOWarden";

/// Helper function to create a new ChecksCreateRequest instance.
pub(crate) fn new_checks_create_request(
head_sha: String,
Expand Down
140 changes: 68 additions & 72 deletions clowarden-server/src/jobs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This module defines the types and functionality needed to schedule and
//! process jobs.

use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::HashMap, time::Duration};

use ::time::OffsetDateTime;
use anyhow::{Error, Result};
Expand All @@ -10,10 +10,11 @@ use futures::future::{self, JoinAll};
use octorust::types::{ChecksCreateRequestConclusion, JobStatus, PullRequestData};
use serde::{Deserialize, Serialize};
use tokio::{
sync::{broadcast, mpsc},
sync::mpsc,
task::JoinHandle,
time::{self, sleep, MissedTickBehavior},
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, instrument};

use self::core::github::Source;
Expand All @@ -31,6 +32,9 @@ use crate::{
tmpl,
};

/// How often periodic reconcile jobs should be scheduled (in seconds).
const RECONCILE_FREQUENCY: u64 = 60 * 60; // Every hour

/// Represents a job to be executed.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -117,84 +121,83 @@ impl ValidateInput {
}
}

/// A jobs handler is in charge of executing the received jobs.
pub(crate) struct Handler {
/// A jobs handler is in charge of executing the received jobs. It will create
/// a worker for each organization, plus an additional task to route jobs to
/// the corresponding organization worker. All tasks will stop when the
/// cancellation token is cancelled.
pub(crate) fn handler(
db: &DynDB,
gh: &DynGH,
ghc: &core::github::DynGH,
services: &HashMap<ServiceName, DynServiceHandler>,
mut jobs_rx: mpsc::UnboundedReceiver<Job>,
cancel_token: CancellationToken,
orgs: Vec<Organization>,
) -> JoinAll<JoinHandle<()>> {
let mut handles = Vec::with_capacity(orgs.len() + 1);
let mut orgs_jobs_tx_channels = HashMap::new();

// Create a worker for each organization
for org in orgs {
let (org_jobs_tx, org_jobs_rx) = mpsc::unbounded_channel();
orgs_jobs_tx_channels.insert(org.name, org_jobs_tx);
let org_worker = OrgWorker::new(db.clone(), gh.clone(), ghc.clone(), services.clone());
handles.push(org_worker.run(org_jobs_rx, cancel_token.clone()));
}

// Create a worker to route jobs to the corresponding org worker
let jobs_router = tokio::spawn(async move {
loop {
tokio::select! {
biased;

// Pick next job from the queue and send it to the corresponding org worker
Some(job) = jobs_rx.recv() => {
if let Some(org_jobs_tx) = orgs_jobs_tx_channels.get(job.org_name()) {
_ = org_jobs_tx.send(job);
}
}

// Exit if the handler has been asked to stop
() = cancel_token.cancelled() => break,
}
}
});
handles.push(jobs_router);

future::join_all(handles)
}

/// An organization worker is in charge of processing jobs for a given
/// organization.
struct OrgWorker {
db: DynDB,
gh: DynGH,
ghc: core::github::DynGH,
services: HashMap<ServiceName, DynServiceHandler>,
}

impl Handler {
/// Create a new handler instance.
pub(crate) fn new(
impl OrgWorker {
/// Create a new organization worker instance.
fn new(
db: DynDB,
gh: DynGH,
ghc: core::github::DynGH,
services: HashMap<ServiceName, DynServiceHandler>,
) -> Arc<Self> {
Arc::new(Self {
) -> Self {
Self {
db,
gh,
ghc,
services,
})
}

/// Spawn some tasks to process jobs received on the jobs channel. We will
/// create one worker per organization, plus an additional task to route
/// jobs to the corresponding organization worker. All tasks will stop when
/// notified on the stop channel provided.
pub(crate) fn start(
self: Arc<Self>,
mut jobs_rx: mpsc::UnboundedReceiver<Job>,
stop_tx: &broadcast::Sender<()>,
orgs: Vec<Organization>,
) -> JoinAll<JoinHandle<()>> {
let mut handles = Vec::with_capacity(orgs.len() + 1);
let mut orgs_jobs_tx_channels = HashMap::new();

// Create a worker for each organization
for org in orgs {
let (org_jobs_tx, org_jobs_rx) = mpsc::unbounded_channel();
orgs_jobs_tx_channels.insert(org.name, org_jobs_tx);
let org_worker = self.clone().organization_worker(org_jobs_rx, stop_tx.subscribe());
handles.push(org_worker);
}

// Create a worker to route jobs to the corresponding org worker
let mut stop_rx = stop_tx.subscribe();
let jobs_router = tokio::spawn(async move {
loop {
tokio::select! {
biased;

// Pick next job from the queue and send it to the corresponding org worker
Some(job) = jobs_rx.recv() => {
if let Some(org_jobs_tx) = orgs_jobs_tx_channels.get(job.org_name()) {
_ = org_jobs_tx.send(job);
}
}

// Exit if the handler has been asked to stop
_ = stop_rx.recv() => {
break
}
}
}
});
handles.push(jobs_router);

future::join_all(handles)
}

/// Spawn a worker that will take care of processing jobs for a given
/// organization. The worker will stop when notified on the stop channel
/// provided.
fn organization_worker(
self: Arc<Self>,
/// Run organization worker.
fn run(
self,
mut org_jobs_rx: mpsc::UnboundedReceiver<Job>,
mut stop_rx: broadcast::Receiver<()>,
cancel_token: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
Expand All @@ -210,9 +213,7 @@ impl Handler {
}

// Exit if the handler has been asked to stop
_ = stop_rx.recv() => {
break
}
() = cancel_token.cancelled() => break,
}
}
})
Expand Down Expand Up @@ -349,14 +350,11 @@ impl Handler {
}
}

/// How often periodic reconcile jobs should be scheduled (in seconds).
const RECONCILE_FREQUENCY: u64 = 60 * 60;

/// A jobs scheduler is in charge of scheduling the execution of some jobs
/// periodically.
pub(crate) fn scheduler(
jobs_tx: mpsc::UnboundedSender<Job>,
mut stop_rx: broadcast::Receiver<()>,
cancel_token: CancellationToken,
orgs: Vec<Organization>,
) -> JoinAll<JoinHandle<()>> {
let scheduler = tokio::spawn(async move {
Expand All @@ -369,9 +367,7 @@ pub(crate) fn scheduler(
biased;

// Exit if the scheduler has been asked to stop
_ = stop_rx.recv() => {
break
}
() = cancel_token.cancelled() => break,

// Schedule reconcile job for each of the registered organizations
_ = reconcile.tick() => {
Expand Down
37 changes: 21 additions & 16 deletions clowarden-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc};
use anyhow::{Context, Result};
use clap::Parser;
use config::{Config, File};
use db::DynDB;
use deadpool_postgres::{Config as DbConfig, Runtime};
use futures::future;
use github::DynGH;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres_openssl::MakeTlsConnector;
use tokio::{
net::TcpListener,
signal,
sync::{broadcast, mpsc},
};
use tokio::{net::TcpListener, signal, sync::mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;

Expand Down Expand Up @@ -69,12 +68,12 @@ async fn main() -> Result<()> {
let connector = MakeTlsConnector::new(builder.build());
let db_cfg: DbConfig = cfg.get("db")?;
let pool = db_cfg.create_pool(Some(Runtime::Tokio1), connector)?;
let db = Arc::new(PgDB::new(pool));
let db: DynDB = Arc::new(PgDB::new(pool));

// Setup GitHub clients
let gh_app: core::cfg::GitHubApp = cfg.get("server.githubApp")?;
let gh = Arc::new(github::GHApi::new(&gh_app).context("error setting up github client")?);
let ghc = Arc::new(
let gh: DynGH = Arc::new(github::GHApi::new(&gh_app).context("error setting up github client")?);
let ghc: core::github::DynGH = Arc::new(
core::github::GHApi::new_with_app_creds(&gh_app).context("error setting up core github client")?,
);

Expand All @@ -84,18 +83,24 @@ async fn main() -> Result<()> {
let svc = Arc::new(services::github::service::SvcApi::new_with_app_creds(&gh_app)?);
services.insert(
services::github::SERVICE_NAME,
Box::new(services::github::Handler::new(ghc.clone(), svc)),
Arc::new(services::github::Handler::new(ghc.clone(), svc)),
);
}

// Setup and launch jobs workers
let (stop_tx, _): (broadcast::Sender<()>, _) = broadcast::channel(1);
let cancel_token = CancellationToken::new();
let (jobs_tx, jobs_rx) = mpsc::unbounded_channel();
let jobs_handler = jobs::Handler::new(db.clone(), gh.clone(), ghc.clone(), services);
let jobs_workers_done = future::join_all([
jobs_handler.start(jobs_rx, &stop_tx, cfg.get("organizations")?),
jobs::scheduler(jobs_tx.clone(), stop_tx.subscribe(), cfg.get("organizations")?),
]);
let jobs_handler = jobs::handler(
&db,
&gh,
&ghc,
&services,
jobs_rx,
cancel_token.clone(),
cfg.get("organizations")?,
);
let jobs_scheduler = jobs::scheduler(jobs_tx.clone(), cancel_token.clone(), cfg.get("organizations")?);
let jobs_workers_done = future::join_all([jobs_handler, jobs_scheduler]);

// Setup and launch HTTP server
let router = handlers::setup_router(&cfg, db.clone(), gh.clone(), jobs_tx)
Expand All @@ -110,7 +115,7 @@ async fn main() -> Result<()> {
}

// Ask jobs workers to stop and wait for them to finish
drop(stop_tx);
cancel_token.cancel();
jobs_workers_done.await;
info!("server stopped");

Expand Down