Skip to content

Commit

Permalink
feat: JOB kit (#157)
Browse files Browse the repository at this point in the history
**Summary**: Implemented the "kit" (tables and queries) for JOB

**Demo**:
The tables and queries in the workspace.
![Screenshot 2024-04-13 at 15 30
22](https://github.com/cmu-db/optd/assets/20631215/46ee6f67-eb69-4fef-8656-6bc46057bcb6)

**Details**:
* I used a personal fork of
[join-order-benchmark](https://github.com/gregrahn/join-order-benchmark)
in case we need to change it.
* I factored `clonepull_repo()` out into a standalone helper function.
* I made `run_command_with_status_check_in_dir()` more ergonomic as well
with a refactoring.
  • Loading branch information
wangpatrick57 authored Apr 14, 2024
1 parent c288d6f commit 9380a7f
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 31 deletions.
1 change: 0 additions & 1 deletion optd-datafusion-repr/src/cost/base_cost/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,6 @@ mod tests {
));
let expr_tree = bin_op(BinOpType::Lt, col_ref(0), cnst(Value::Int32(15)));
let expr_tree_rev = bin_op(BinOpType::Geq, cnst(Value::Int32(15)), col_ref(0));
// TODO(phw2): make column_refs a function
let column_refs = vec![ColumnRef::BaseTableColumnRef {
table: String::from(TABLE1_NAME),
col_idx: 0,
Expand Down
140 changes: 140 additions & 0 deletions optd-perftest/src/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/// A wrapper around job-kit
use serde::{Deserialize, Serialize};

use crate::shell;
use std::fmt::{self, Display, Formatter};
use std::fs;
use std::fs::File;
use std::io;
use std::path::{Path, PathBuf};

const JOB_KIT_REPO_URL: &str = "https://github.com/wangpatrick57/job-kit.git";
const JOB_TABLES_URL: &str = "https://homepages.cwi.nl/~boncz/job/imdb.tgz";

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct JobConfig {
pub query_ids: Vec<u32>,
}

impl Display for JobConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
// Use write! macro to write formatted string to `f`
write!(f, "JobConfig(query_ids={:?})", self.query_ids,)
}
}

/// Provides many helper functions for running a JOB workload.
/// It does not actually execute the queries as it is meant to be DBMS-agnostic.
/// Is essentially a wrapper around the job-kit repo.
/// Since it's conceptually a wrapper around the repo, I chose _not_ to make
/// JobConfig an initialization parameter.
pub struct JobKit {
_workspace_dpath: PathBuf,

// cache these paths so we don't have to build them multiple times
job_dpath: PathBuf,
job_kit_repo_dpath: PathBuf,
downloaded_tables_dpath: PathBuf,
queries_dpath: PathBuf,
pub schema_fpath: PathBuf,
pub indexes_fpath: PathBuf,
}

impl JobKit {
pub fn build<P: AsRef<Path>>(workspace_dpath: P) -> io::Result<Self> {
log::debug!("[start] building JobKit");

// build paths, sometimes creating them if they don't exist
let workspace_dpath = workspace_dpath.as_ref().to_path_buf();
let job_dpath = workspace_dpath.join("job");
if !job_dpath.exists() {
fs::create_dir(&job_dpath)?;
}
let job_kit_repo_dpath = job_dpath.join("job-kit");
let queries_dpath = job_kit_repo_dpath.join("queries");
let downloaded_tables_dpath = job_dpath.join("downloaded_tables");
if !downloaded_tables_dpath.exists() {
fs::create_dir(&downloaded_tables_dpath)?;
}
let schema_fpath = job_kit_repo_dpath.join("schema.sql");
let indexes_fpath = job_kit_repo_dpath.join("fkindexes.sql");

// create Self
let kit = JobKit {
_workspace_dpath: workspace_dpath,
job_dpath,
job_kit_repo_dpath,
queries_dpath,
downloaded_tables_dpath,
schema_fpath,
indexes_fpath,
};

// setup
shell::clonepull_repo(JOB_KIT_REPO_URL, &kit.job_kit_repo_dpath)?;

log::debug!("[end] building TpchKit");
Ok(kit)
}

/// Download the .csv files for all tables of JOB
pub fn download_tables(&self, job_config: &JobConfig) -> io::Result<()> {
let done_fpath = self.downloaded_tables_dpath.join("download_tables_done");
if !done_fpath.exists() {
log::debug!("[start] downloading tables for {}", job_config);
// Instructions are from https://cedardb.com/docs/guides/example_datasets/job/, not from the job-kit repo.
shell::run_command_with_status_check_in_dir(
&format!("curl -O {JOB_TABLES_URL}"),
&self.job_dpath,
)?;
shell::make_into_empty_dir(&self.downloaded_tables_dpath)?;
shell::run_command_with_status_check_in_dir(
"tar -zxvf ../imdb.tgz",
&self.downloaded_tables_dpath,
)?;
shell::run_command_with_status_check_in_dir("rm imdb.tgz", &self.job_dpath)?;
File::create(done_fpath)?;
log::debug!("[end] downloading tables for {}", job_config);
} else {
log::debug!("[skip] downloading tables for {}", job_config);
}
Ok(())
}

/// Convert a tbl_fpath into the table name
pub fn get_tbl_name_from_tbl_fpath<P: AsRef<Path>>(tbl_fpath: P) -> String {
tbl_fpath
.as_ref()
.file_stem()
.unwrap()
.to_str()
.unwrap()
.to_string()
}

/// Get an iterator through all generated .tbl files of a given config
pub fn get_tbl_fpath_iter(&self) -> io::Result<impl Iterator<Item = PathBuf>> {
let dirent_iter = fs::read_dir(&self.downloaded_tables_dpath)?;
// all results/options are fine to be unwrapped except for path.extension() because that could
// return None in various cases
let path_iter = dirent_iter.map(|dirent| dirent.unwrap().path());
let tbl_fpath_iter = path_iter
.filter(|path| path.extension().map(|ext| ext.to_str().unwrap()) == Some("csv"));
Ok(tbl_fpath_iter)
}

/// Get an iterator through all generated .sql files _in order_ of a given config
/// It's important to iterate _in order_ due to the interface of CardtestRunnerDBMSHelper
pub fn get_sql_fpath_ordered_iter(
&self,
job_config: &JobConfig,
) -> io::Result<impl Iterator<Item = (u32, PathBuf)>> {
let queries_dpath = self.queries_dpath.clone();
let sql_fpath_ordered_iter = job_config
.query_ids
.clone()
.into_iter()
.map(move |query_id| (query_id, queries_dpath.join(format!("{}.sql", query_id))));
Ok(sql_fpath_ordered_iter)
}
}
1 change: 1 addition & 0 deletions optd-perftest/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod benchmark;
pub mod cardtest;
mod datafusion_dbms;
pub mod job;
mod postgres_dbms;
pub mod shell;
pub mod tpch;
Expand Down
31 changes: 30 additions & 1 deletion optd-perftest/src/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@ use std::{fs, io};
/// Runs a command, exiting the program immediately if the command fails
pub fn run_command_with_status_check(cmd_str: &str) -> io::Result<Output> {
// we need to bind it to some arbitrary type that implements AsRef<Path>. I just chose &Path
run_command_with_status_check_in_dir::<&Path>(cmd_str, None)
run_command_with_status_check_core::<&Path>(cmd_str, None)
}

/// Runs a command in a directory, exiting the program immediately if the command fails
pub fn run_command_with_status_check_in_dir<P: AsRef<Path>>(
cmd_str: &str,
in_path: P,
) -> io::Result<Output> {
run_command_with_status_check_core::<P>(cmd_str, Some(in_path))
}

/// This function exposes all the different ways to run a command, but the interface is not ergonomic.
/// The ergonomic wrappers above are a workaround for Rust not having default values on parameters.
pub fn run_command_with_status_check_core<P: AsRef<Path>>(
cmd_str: &str,
in_path: Option<P>,
) -> io::Result<Output> {
Expand Down Expand Up @@ -79,3 +88,23 @@ pub fn parse_pathstr(pathstr: &str) -> io::Result<PathBuf> {
};
Ok(path)
}

/// Get a repo to its latest state by either cloning or pulling
pub fn clonepull_repo<P: AsRef<Path>>(repo_url: &str, repo_dpath: P) -> io::Result<()> {
if !repo_dpath.as_ref().exists() {
log::debug!("[start] cloning {} repo", repo_url);
run_command_with_status_check(&format!(
"git clone {} {}",
repo_url,
repo_dpath.as_ref().to_str().unwrap()
))?;
log::debug!("[end] cloning {} repo", repo_url);
} else {
log::debug!("[skip] cloning {} repo", repo_url);
}
log::debug!("[start] pulling latest {} repo", repo_url);
run_command_with_status_check_in_dir("git pull", &repo_dpath)?;
log::debug!("[end] pulling latest {} repo", repo_url);
// make sure to do this so that get_optd_root() doesn't break
Ok(())
}
36 changes: 7 additions & 29 deletions optd-perftest/src/tpch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// A wrapper around tpch-kit
use serde::{Deserialize, Serialize};

use crate::shell;
/// A wrapper around tpch-kit (https://github.com/gregrahn/tpch-kit)
use std::env;
use std::env::consts::OS;
use std::fmt::{self, Display, Formatter};
Expand Down Expand Up @@ -53,7 +53,6 @@ pub struct TpchKit {
pub indexes_fpath: PathBuf,
}

/// I keep the same conventions for these methods as I do for PostgresDBMS
impl TpchKit {
pub fn build<P: AsRef<Path>>(workspace_dpath: P) -> io::Result<Self> {
log::debug!("[start] building TpchKit");
Expand Down Expand Up @@ -93,44 +92,23 @@ impl TpchKit {
indexes_fpath,
};

// set envvars (DSS_PATH can change so we don't set it now)
// setup
env::set_var("DSS_CONFIG", kit.dbgen_dpath.to_str().unwrap());
env::set_var("DSS_QUERY", kit.queries_dpath.to_str().unwrap());

// do setup after creating kit
kit.clonepull_tpch_kit_repo()?;
shell::clonepull_repo(TPCH_KIT_REPO_URL, &kit.tpch_kit_repo_dpath)?;

log::debug!("[end] building TpchKit");
Ok(kit)
}

fn clonepull_tpch_kit_repo(&self) -> io::Result<()> {
if !self.tpch_kit_repo_dpath.exists() {
log::debug!("[start] cloning tpch-kit repo");
shell::run_command_with_status_check(&format!(
"git clone {} {}",
TPCH_KIT_REPO_URL,
self.tpch_kit_repo_dpath.to_str().unwrap()
))?;
log::debug!("[end] cloning tpch-kit repo");
} else {
log::debug!("[skip] cloning tpch-kit repo");
}
log::debug!("[start] pulling latest tpch-kit repo");
shell::run_command_with_status_check_in_dir("git pull", Some(&self.tpch_kit_repo_dpath))?;
log::debug!("[end] pulling latest tpch-kit repo");
// make sure to do this so that get_optd_root() doesn't break
Ok(())
}

pub fn make(&self, dbms: &str) -> io::Result<()> {
log::debug!("[start] building dbgen");
// we need to call "make clean" because we might have called make earlier with
// a different dbms
shell::run_command_with_status_check_in_dir("make clean", Some(&self.dbgen_dpath))?;
shell::run_command_with_status_check_in_dir("make clean", &self.dbgen_dpath)?;
shell::run_command_with_status_check_in_dir(
&format!("make MACHINE={} DATABASE={}", TpchKit::get_machine(), dbms),
Some(&self.dbgen_dpath),
&self.dbgen_dpath,
)?;
log::debug!("[end] building dbgen");
Ok(())
Expand All @@ -156,7 +134,7 @@ impl TpchKit {
log::debug!("[start] generating tables for {}", tpch_config);
shell::run_command_with_status_check_in_dir(
&format!("./dbgen -s{}", tpch_config.scale_factor),
Some(&self.dbgen_dpath),
&self.dbgen_dpath,
)?;
File::create(done_fpath)?;
log::debug!("[end] generating tables for {}", tpch_config);
Expand All @@ -181,7 +159,7 @@ impl TpchKit {
"./qgen -s{} -r{} {}",
tpch_config.scale_factor, tpch_config.seed, query_i
),
Some(&self.dbgen_dpath),
&self.dbgen_dpath,
)?;
let this_genned_queries_fpath =
this_genned_queries_dpath.join(format!("{}.sql", query_i));
Expand Down

0 comments on commit 9380a7f

Please sign in to comment.