Skip to content

Commit

Permalink
Merge pull request #56 from tweag/garbage-collection
Browse files Browse the repository at this point in the history
feat: add a garbage collector
  • Loading branch information
Erin van der Veen authored May 20, 2024
2 parents 101d99d + b4e8dee commit 3f70c66
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 35 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

## [Unreleased]

### Added
- [#56](https://github.com/tweag/genealogos/pull/56) adds a garbage collector to the jobs api, to prevent stale jobs from taking up unnecessary memory

## [0.3.0](https://github.com/tweag/genealogos/compare/v0.2.0...v0.3.0)
### Changed
- [#55](https://github.com/tweag/genealogos/pull/55) splits of the `messages()` function into its own trait. This resolves many issues caused by a cargo issue.
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ Finally, getting the result is done with the `result` endpoint:
curl "http://localhost:8000/api/jobs/result/0"
```

#### Configuration
The `genealogos-api` can be configured through [Rocket](https://rocket.rs)'s configuration mechanism.
It uses the [default providers](https://rocket.rs/guide/v0.5/configuration/#default-provider) defined by Rocket.
The [default options](https://rocket.rs/guide/v0.5/configuration/#overview) contain all things related to the webserver itself.
In addition to those default configuration options, Genealogos extends Rockets' configuration with two additional keys:

| key | kind | description | debug/release default |
|------------------|--------------------|--------------------------------------------------------------------|-----------------------|
| `gc_interval` | `u64` (in seconds) | The interval between two invocations of the garbage collector | `10` |
| `gc_stale_after` | `u64` (in seconds) | How long after being touched last a job should be considered stale | `60 * 10` |

### `genealogos-frontend`
Genealogos ships with a pure html/javascript web frontend.
By default, this frontend uses `127.0.0.1` to connect to the `genealogos-api`.
Expand Down
7 changes: 4 additions & 3 deletions genealogos-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ path = "src/main.rs"

[dependencies]
chrono.workspace = true
rocket.workspace = true
genealogos = { workspace = true, features = [ "rocket" ] }
serde_json.workspace = true
serde.workspace = true
log.workspace = true
nixtract.workspace = true
rocket.workspace = true
serde.workspace = true
serde_json.workspace = true

[dev-dependencies]
env_logger.workspace = true
Expand Down
17 changes: 17 additions & 0 deletions genealogos-api/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use super::job_map::{GCInterval, GCStaleAfter};

#[derive(rocket::serde::Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct Config {
#[serde(flatten)]
pub gc: GCConfig,
}

#[derive(rocket::serde::Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct GCConfig {
#[serde(default, rename = "gc_interval")]
pub interval: GCInterval,
#[serde(default, rename = "gc_stale_after")]
pub stale_after: GCStaleAfter,
}
28 changes: 3 additions & 25 deletions genealogos-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,9 @@ use rocket::tokio;

use crate::messages::{self, Result, StatusEnum, StatusResponse};

pub type JobId = u16;
pub mod job_map;

/// This JobMap holds the status of all jobs that are currently running
pub type JobMap = Arc<rocket::tokio::sync::Mutex<std::collections::HashMap<JobId, JobStatus>>>;

pub enum JobStatus {
Stopped,
/// The job is still running, the receiver is used receive status messages from worker threads
Running(Box<dyn genealogos::backend::BackendHandle + Send>),
Done(String, time::Duration),
Error(String),
}

impl ToString for JobStatus {
fn to_string(&self) -> String {
match self {
JobStatus::Running(_) => "running".to_string(),
JobStatus::Done(_, _) => "done".to_string(),
JobStatus::Stopped => "stopped".to_string(),
JobStatus::Error(e) => e.to_owned(),
}
}
}
use job_map::{JobId, JobMap, JobStatus};

#[rocket::get("/create?<installable>&<bom_format>")]
pub async fn create(
Expand Down Expand Up @@ -110,9 +90,7 @@ pub async fn status(
job_id: JobId,
job_map: &rocket::State<JobMap>,
) -> Result<messages::StatusResponse> {
let mut locked_map = job_map
.try_lock()
.map_err(|e| messages::ErrResponse::with_job_id(job_id, e))?;
let mut locked_map = job_map.lock().await;

let status = locked_map.get(&job_id).unwrap_or(&JobStatus::Stopped);

Expand Down
129 changes: 129 additions & 0 deletions genealogos-api/src/jobs/job_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use rocket::tokio::time;

pub type JobMap = std::sync::Arc<rocket::tokio::sync::Mutex<JobHashMap>>;

/// This JobMap holds the status of all jobs that are currently running
pub struct JobHashMap(std::collections::HashMap<JobId, JobMapEntry>);

/// A single entry in the job map, contains all data related to the job and some
/// metadata required for the garbage collector.
pub struct JobMapEntry {
/// Stores the last time this job was accesed. Any job that is not accessed
/// for a certain amount of time is considered stale and will be removed.
last_updated: time::Instant,
/// The status of the job
status: JobStatus,
}

pub type JobId = u16;

/// The status of a single job
pub enum JobStatus {
/// The job has been stopped and is not running anymore, or it has not been started yet
Stopped,
/// The job is still running, the receiver is used receive status messages from worker threads
Running(Box<dyn genealogos::backend::BackendHandle + Send>),
/// The job has finished, the string contains the output of the job
/// and the duration contains how long it took to finish
Done(String, time::Duration),
/// The job has thrown an error, the string contains the error message
Error(String),
}

impl ToString for JobStatus {
fn to_string(&self) -> String {
match self {
JobStatus::Running(_) => "running".to_string(),
JobStatus::Done(_, _) => "done".to_string(),
JobStatus::Stopped => "stopped".to_string(),
JobStatus::Error(e) => format!("Error: {}", e),
}
}
}

impl JobMapEntry {
pub fn new(status: JobStatus) -> Self {
Self {
last_updated: time::Instant::now(),
status,
}
}
}

impl JobHashMap {
pub fn insert(&mut self, job_id: JobId, job_status: JobStatus) {
self.0.insert(job_id, JobMapEntry::new(job_status));
}

pub fn get(&mut self, job_id: &JobId) -> Option<&JobStatus> {
self.0.get(job_id).map(|entry| &entry.status)
}

pub fn remove(&mut self, job_id: &JobId) -> Option<JobStatus> {
self.0.remove(job_id).map(|entry| entry.status)
}

pub(crate) fn new() -> Self {
Self(std::collections::HashMap::new())
}
}

#[derive(rocket::serde::Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct GCInterval(u64);

#[derive(rocket::serde::Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct GCStaleAfter(u64);

impl Default for GCInterval {
fn default() -> Self {
// By default, run the garbage collector once every ten seconds
Self(10)
}
}

impl Default for GCStaleAfter {
fn default() -> Self {
// By default, remove a stale job after 1 hour
Self(60 * 60)
}
}

/// The garbage collector will check for any stale jobs in the `JobMap` and remove them
/// after a certain amount of time. The interval is how often the garbage collector
/// will run, and the remove_after is when a job is considered stale.
/// The garbage collector will run in a loop forever.
/// This function will block the thread it is running in.
///
/// # Arguments
/// * `job_map` - A reference to the `JobMap` that contains all the jobs
/// * `gc_config` - The configuration for the garbage collector
pub async fn garbage_collector(job_map: JobMap, gc_config: crate::config::GCConfig) {
let stale_after = time::Duration::from_secs(gc_config.stale_after.0);
let mut interval = time::interval(time::Duration::from_secs(gc_config.interval.0));

log::info!("Started the garbage collector");

loop {
interval.tick().await;

let mut count: u16 = 0;
let mut job_map = job_map.lock().await;
log::info!("Current job count: {}", job_map.0.len());

// Retain allo jobs that are not stale
job_map.0.retain(|_, entry| {
if entry.last_updated.elapsed() < stale_after {
true
} else {
count += 1;
false
}
});

if count > 0 {
log::info!("Removed {} stale jobs", count);
}
}
}
28 changes: 23 additions & 5 deletions genealogos-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use std::sync::{atomic, Arc};
use genealogos::args::BomArg;
use genealogos::backend::Backend;
use genealogos::bom::Bom;
use jobs::job_map::{self, garbage_collector};
use rocket::http::Status;
use rocket::response::{content, status};
use rocket::serde::json::Json;
use rocket::tokio::sync::Mutex;
use rocket::Request;

mod config;
mod jobs;
mod messages;

Expand Down Expand Up @@ -72,7 +74,16 @@ fn analyze(installable: &str, bom_format: Option<BomArg>) -> Result<messages::An

#[rocket::launch]
fn rocket() -> _ {
rocket::build()
let job_map = Arc::new(Mutex::new(job_map::JobHashMap::new()));

let job_map_clone = job_map.clone();

let rocket = rocket::build();
let figment = rocket.figment();

let config: config::Config = figment.extract().expect("Failed to load configuration");

rocket
.attach(rocket::fairing::AdHoc::on_response("cors", |_req, resp| {
Box::pin(async move {
resp.set_header(rocket::http::Header::new(
Expand All @@ -81,6 +92,16 @@ fn rocket() -> _ {
));
})
}))
.attach(rocket::fairing::AdHoc::on_liftoff(
"garbage_collector",
|_| {
Box::pin(async move {
rocket::tokio::spawn(async move {
garbage_collector(job_map_clone, config.gc).await;
});
})
},
))
.mount("/", rocket::routes![index])
.mount("/api", rocket::routes![analyze])
.register("/api", rocket::catchers![handle_errors])
Expand All @@ -89,10 +110,7 @@ fn rocket() -> _ {
rocket::routes![jobs::create, jobs::status, jobs::result],
)
.register("/api/jobs/", rocket::catchers![handle_errors])
.manage(Arc::new(Mutex::new(std::collections::HashMap::<
jobs::JobId,
jobs::JobStatus,
>::new())))
.manage(job_map)
.manage(atomic::AtomicU16::new(0))
}

Expand Down
7 changes: 5 additions & 2 deletions nix/genealogos-module.nix
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ in
description = lib.mdDoc ''
Configuration file for Genealogos.
Genealogos-api uses rocket as its http implementation.
For all configuration options, see https://rocket.rs/guide/v0.5/configuration/#configuration-parameters
Genealogos-api uses Rocket as its webserver implementation.
For all rocket configuration options, see https://rocket.rs/guide/v0.5/configuration/#configuration-parameters
Genealogos further defines some custom options.
See the projects [README](https://github.com/tweag/genealogos/blob/master/README.md) for more information.
'';
};
};
Expand Down

0 comments on commit 3f70c66

Please sign in to comment.