From 9380a7fcb5db30b3bda6f994d09b73179953c6fc Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sun, 14 Apr 2024 17:56:11 -0400 Subject: [PATCH] feat: JOB kit (#157) **Summary**: Implemented the "kit" (tables and queries) for JOB **Demo**: The tables and queries in the workspace. ![Screenshot 2024-04-13 at 15 30 22](https://github.com/cmu-db/optd/assets/20631215/46ee6f67-eb69-4fef-8656-6bc46057bcb6) **Details**: * I used a personal fork of [join-order-benchmark](https://github.com/gregrahn/join-order-benchmark) in case we need to change it. * I factored `clonepull_repo()` out into a standalone helper function. * I made `run_command_with_status_check_in_dir()` more ergonomic as well with a refactoring. --- .../src/cost/base_cost/filter.rs | 1 - optd-perftest/src/job.rs | 140 ++++++++++++++++++ optd-perftest/src/lib.rs | 1 + optd-perftest/src/shell.rs | 31 +++- optd-perftest/src/tpch.rs | 36 +---- 5 files changed, 178 insertions(+), 31 deletions(-) create mode 100644 optd-perftest/src/job.rs diff --git a/optd-datafusion-repr/src/cost/base_cost/filter.rs b/optd-datafusion-repr/src/cost/base_cost/filter.rs index 04595dca..ed029852 100644 --- a/optd-datafusion-repr/src/cost/base_cost/filter.rs +++ b/optd-datafusion-repr/src/cost/base_cost/filter.rs @@ -748,7 +748,6 @@ mod tests { )); let expr_tree = bin_op(BinOpType::Lt, col_ref(0), cnst(Value::Int32(15))); let expr_tree_rev = bin_op(BinOpType::Geq, cnst(Value::Int32(15)), col_ref(0)); - // TODO(phw2): make column_refs a function let column_refs = vec![ColumnRef::BaseTableColumnRef { table: String::from(TABLE1_NAME), col_idx: 0, diff --git a/optd-perftest/src/job.rs b/optd-perftest/src/job.rs new file mode 100644 index 00000000..e71ae57f --- /dev/null +++ b/optd-perftest/src/job.rs @@ -0,0 +1,140 @@ +/// A wrapper around job-kit +use serde::{Deserialize, Serialize}; + +use crate::shell; +use std::fmt::{self, Display, Formatter}; +use std::fs; +use std::fs::File; +use std::io; +use std::path::{Path, PathBuf}; + +const JOB_KIT_REPO_URL: &str = "https://github.com/wangpatrick57/job-kit.git"; +const JOB_TABLES_URL: &str = "https://homepages.cwi.nl/~boncz/job/imdb.tgz"; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct JobConfig { + pub query_ids: Vec, +} + +impl Display for JobConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + // Use write! macro to write formatted string to `f` + write!(f, "JobConfig(query_ids={:?})", self.query_ids,) + } +} + +/// Provides many helper functions for running a JOB workload. +/// It does not actually execute the queries as it is meant to be DBMS-agnostic. +/// Is essentially a wrapper around the job-kit repo. +/// Since it's conceptually a wrapper around the repo, I chose _not_ to make +/// JobConfig an initialization parameter. +pub struct JobKit { + _workspace_dpath: PathBuf, + + // cache these paths so we don't have to build them multiple times + job_dpath: PathBuf, + job_kit_repo_dpath: PathBuf, + downloaded_tables_dpath: PathBuf, + queries_dpath: PathBuf, + pub schema_fpath: PathBuf, + pub indexes_fpath: PathBuf, +} + +impl JobKit { + pub fn build>(workspace_dpath: P) -> io::Result { + log::debug!("[start] building JobKit"); + + // build paths, sometimes creating them if they don't exist + let workspace_dpath = workspace_dpath.as_ref().to_path_buf(); + let job_dpath = workspace_dpath.join("job"); + if !job_dpath.exists() { + fs::create_dir(&job_dpath)?; + } + let job_kit_repo_dpath = job_dpath.join("job-kit"); + let queries_dpath = job_kit_repo_dpath.join("queries"); + let downloaded_tables_dpath = job_dpath.join("downloaded_tables"); + if !downloaded_tables_dpath.exists() { + fs::create_dir(&downloaded_tables_dpath)?; + } + let schema_fpath = job_kit_repo_dpath.join("schema.sql"); + let indexes_fpath = job_kit_repo_dpath.join("fkindexes.sql"); + + // create Self + let kit = JobKit { + _workspace_dpath: workspace_dpath, + job_dpath, + job_kit_repo_dpath, + queries_dpath, + downloaded_tables_dpath, + schema_fpath, + indexes_fpath, + }; + + // setup + shell::clonepull_repo(JOB_KIT_REPO_URL, &kit.job_kit_repo_dpath)?; + + log::debug!("[end] building TpchKit"); + Ok(kit) + } + + /// Download the .csv files for all tables of JOB + pub fn download_tables(&self, job_config: &JobConfig) -> io::Result<()> { + let done_fpath = self.downloaded_tables_dpath.join("download_tables_done"); + if !done_fpath.exists() { + log::debug!("[start] downloading tables for {}", job_config); + // Instructions are from https://cedardb.com/docs/guides/example_datasets/job/, not from the job-kit repo. + shell::run_command_with_status_check_in_dir( + &format!("curl -O {JOB_TABLES_URL}"), + &self.job_dpath, + )?; + shell::make_into_empty_dir(&self.downloaded_tables_dpath)?; + shell::run_command_with_status_check_in_dir( + "tar -zxvf ../imdb.tgz", + &self.downloaded_tables_dpath, + )?; + shell::run_command_with_status_check_in_dir("rm imdb.tgz", &self.job_dpath)?; + File::create(done_fpath)?; + log::debug!("[end] downloading tables for {}", job_config); + } else { + log::debug!("[skip] downloading tables for {}", job_config); + } + Ok(()) + } + + /// Convert a tbl_fpath into the table name + pub fn get_tbl_name_from_tbl_fpath>(tbl_fpath: P) -> String { + tbl_fpath + .as_ref() + .file_stem() + .unwrap() + .to_str() + .unwrap() + .to_string() + } + + /// Get an iterator through all generated .tbl files of a given config + pub fn get_tbl_fpath_iter(&self) -> io::Result> { + let dirent_iter = fs::read_dir(&self.downloaded_tables_dpath)?; + // all results/options are fine to be unwrapped except for path.extension() because that could + // return None in various cases + let path_iter = dirent_iter.map(|dirent| dirent.unwrap().path()); + let tbl_fpath_iter = path_iter + .filter(|path| path.extension().map(|ext| ext.to_str().unwrap()) == Some("csv")); + Ok(tbl_fpath_iter) + } + + /// Get an iterator through all generated .sql files _in order_ of a given config + /// It's important to iterate _in order_ due to the interface of CardtestRunnerDBMSHelper + pub fn get_sql_fpath_ordered_iter( + &self, + job_config: &JobConfig, + ) -> io::Result> { + let queries_dpath = self.queries_dpath.clone(); + let sql_fpath_ordered_iter = job_config + .query_ids + .clone() + .into_iter() + .map(move |query_id| (query_id, queries_dpath.join(format!("{}.sql", query_id)))); + Ok(sql_fpath_ordered_iter) + } +} diff --git a/optd-perftest/src/lib.rs b/optd-perftest/src/lib.rs index 0c6e9230..058185ef 100644 --- a/optd-perftest/src/lib.rs +++ b/optd-perftest/src/lib.rs @@ -1,6 +1,7 @@ mod benchmark; pub mod cardtest; mod datafusion_dbms; +pub mod job; mod postgres_dbms; pub mod shell; pub mod tpch; diff --git a/optd-perftest/src/shell.rs b/optd-perftest/src/shell.rs index fc0529e1..70698c43 100644 --- a/optd-perftest/src/shell.rs +++ b/optd-perftest/src/shell.rs @@ -6,11 +6,20 @@ use std::{fs, io}; /// Runs a command, exiting the program immediately if the command fails pub fn run_command_with_status_check(cmd_str: &str) -> io::Result { // we need to bind it to some arbitrary type that implements AsRef. I just chose &Path - run_command_with_status_check_in_dir::<&Path>(cmd_str, None) + run_command_with_status_check_core::<&Path>(cmd_str, None) } /// Runs a command in a directory, exiting the program immediately if the command fails pub fn run_command_with_status_check_in_dir>( + cmd_str: &str, + in_path: P, +) -> io::Result { + run_command_with_status_check_core::

(cmd_str, Some(in_path)) +} + +/// This function exposes all the different ways to run a command, but the interface is not ergonomic. +/// The ergonomic wrappers above are a workaround for Rust not having default values on parameters. +pub fn run_command_with_status_check_core>( cmd_str: &str, in_path: Option

, ) -> io::Result { @@ -79,3 +88,23 @@ pub fn parse_pathstr(pathstr: &str) -> io::Result { }; Ok(path) } + +/// Get a repo to its latest state by either cloning or pulling +pub fn clonepull_repo>(repo_url: &str, repo_dpath: P) -> io::Result<()> { + if !repo_dpath.as_ref().exists() { + log::debug!("[start] cloning {} repo", repo_url); + run_command_with_status_check(&format!( + "git clone {} {}", + repo_url, + repo_dpath.as_ref().to_str().unwrap() + ))?; + log::debug!("[end] cloning {} repo", repo_url); + } else { + log::debug!("[skip] cloning {} repo", repo_url); + } + log::debug!("[start] pulling latest {} repo", repo_url); + run_command_with_status_check_in_dir("git pull", &repo_dpath)?; + log::debug!("[end] pulling latest {} repo", repo_url); + // make sure to do this so that get_optd_root() doesn't break + Ok(()) +} diff --git a/optd-perftest/src/tpch.rs b/optd-perftest/src/tpch.rs index 88723914..db091008 100644 --- a/optd-perftest/src/tpch.rs +++ b/optd-perftest/src/tpch.rs @@ -1,7 +1,7 @@ +/// A wrapper around tpch-kit use serde::{Deserialize, Serialize}; use crate::shell; -/// A wrapper around tpch-kit (https://github.com/gregrahn/tpch-kit) use std::env; use std::env::consts::OS; use std::fmt::{self, Display, Formatter}; @@ -53,7 +53,6 @@ pub struct TpchKit { pub indexes_fpath: PathBuf, } -/// I keep the same conventions for these methods as I do for PostgresDBMS impl TpchKit { pub fn build>(workspace_dpath: P) -> io::Result { log::debug!("[start] building TpchKit"); @@ -93,44 +92,23 @@ impl TpchKit { indexes_fpath, }; - // set envvars (DSS_PATH can change so we don't set it now) + // setup env::set_var("DSS_CONFIG", kit.dbgen_dpath.to_str().unwrap()); env::set_var("DSS_QUERY", kit.queries_dpath.to_str().unwrap()); - - // do setup after creating kit - kit.clonepull_tpch_kit_repo()?; + shell::clonepull_repo(TPCH_KIT_REPO_URL, &kit.tpch_kit_repo_dpath)?; log::debug!("[end] building TpchKit"); Ok(kit) } - fn clonepull_tpch_kit_repo(&self) -> io::Result<()> { - if !self.tpch_kit_repo_dpath.exists() { - log::debug!("[start] cloning tpch-kit repo"); - shell::run_command_with_status_check(&format!( - "git clone {} {}", - TPCH_KIT_REPO_URL, - self.tpch_kit_repo_dpath.to_str().unwrap() - ))?; - log::debug!("[end] cloning tpch-kit repo"); - } else { - log::debug!("[skip] cloning tpch-kit repo"); - } - log::debug!("[start] pulling latest tpch-kit repo"); - shell::run_command_with_status_check_in_dir("git pull", Some(&self.tpch_kit_repo_dpath))?; - log::debug!("[end] pulling latest tpch-kit repo"); - // make sure to do this so that get_optd_root() doesn't break - Ok(()) - } - pub fn make(&self, dbms: &str) -> io::Result<()> { log::debug!("[start] building dbgen"); // we need to call "make clean" because we might have called make earlier with // a different dbms - shell::run_command_with_status_check_in_dir("make clean", Some(&self.dbgen_dpath))?; + shell::run_command_with_status_check_in_dir("make clean", &self.dbgen_dpath)?; shell::run_command_with_status_check_in_dir( &format!("make MACHINE={} DATABASE={}", TpchKit::get_machine(), dbms), - Some(&self.dbgen_dpath), + &self.dbgen_dpath, )?; log::debug!("[end] building dbgen"); Ok(()) @@ -156,7 +134,7 @@ impl TpchKit { log::debug!("[start] generating tables for {}", tpch_config); shell::run_command_with_status_check_in_dir( &format!("./dbgen -s{}", tpch_config.scale_factor), - Some(&self.dbgen_dpath), + &self.dbgen_dpath, )?; File::create(done_fpath)?; log::debug!("[end] generating tables for {}", tpch_config); @@ -181,7 +159,7 @@ impl TpchKit { "./qgen -s{} -r{} {}", tpch_config.scale_factor, tpch_config.seed, query_i ), - Some(&self.dbgen_dpath), + &self.dbgen_dpath, )?; let this_genned_queries_fpath = this_genned_queries_dpath.join(format!("{}.sql", query_i));