From 29647f207d8ef271713310ff2859e6f110250b61 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Sun, 3 Nov 2024 10:55:20 -0500 Subject: [PATCH] refactor(core): separate cost model and stat estimation (#209) part of https://github.com/cmu-db/optd/issues/195 * also refactors the adv-cost crate to purely support cardinality estimation * rewrite optimize input task to reduce dead code (which might have some effect on the plan output?) * use `cargo run -p optd-sqlplannertest --bin planner_test_apply -- --enable-advanced-cost-model` to experiment with the advanced cost model --------- Signed-off-by: Alex Chi --- Cargo.lock | 145 +++++++++++ datafusion-optd-cli/src/main.rs | 2 +- optd-core/Cargo.toml | 3 +- optd-core/src/cascades/memo.rs | 152 ++++++++---- optd-core/src/cascades/optimizer.rs | 94 +++----- .../src/cascades/tasks/optimize_group.rs | 2 +- .../src/cascades/tasks/optimize_inputs.rs | 226 +++++++----------- optd-core/src/cost.rs | 40 +++- optd-core/src/rel_node.rs | 35 ++- .../src/adaptive_cost.rs | 108 --------- .../src/{adv_cost.rs => adv_stats.rs} | 186 +------------- .../src/{adv_cost => adv_stats}/agg.rs | 30 +-- .../src/{adv_cost => adv_stats}/filter.rs | 25 +- .../{adv_cost => adv_stats}/filter/in_list.rs | 8 +- .../{adv_cost => adv_stats}/filter/like.rs | 8 +- .../src/{adv_cost => adv_stats}/join.rs | 52 ++-- .../src/{adv_cost => adv_stats}/limit.rs | 30 +-- .../src/{adv_cost => adv_stats}/stats.rs | 0 .../src/bin/test_optimize_adv.rs | 109 --------- optd-datafusion-repr-adv-cost/src/lib.rs | 151 +++++++++++- optd-datafusion-repr/Cargo.toml | 1 + optd-datafusion-repr/src/bin/test_optimize.rs | 8 +- optd-datafusion-repr/src/cost.rs | 2 +- .../src/cost/adaptive_cost.rs | 89 ++++--- optd-datafusion-repr/src/cost/base_cost.rs | 219 +++++++++-------- optd-datafusion-repr/src/explain.rs | 14 +- optd-datafusion-repr/src/lib.rs | 4 - .../src/testing/dummy_cost.rs | 49 ++-- optd-perfbench/src/datafusion_dbms.rs | 2 +- optd-sqlplannertest/Cargo.toml | 1 + .../src/bin/planner_test_apply.rs | 19 +- optd-sqlplannertest/src/lib.rs | 34 ++- .../tests/basic/verbose.planner.sql | 7 +- .../tests/joins/join_enumerate.planner.sql | 16 +- .../tests/joins/join_enumerate.yml | 10 +- .../subqueries/subquery_unnesting.planner.sql | 20 +- .../tests/subqueries/subquery_unnesting.yml | 2 +- .../tests/tpch/tpch-06-10.planner.sql | 33 ++- .../tests/tpch/tpch-11-15.planner.sql | 88 +++---- 39 files changed, 1002 insertions(+), 1022 deletions(-) delete mode 100644 optd-datafusion-repr-adv-cost/src/adaptive_cost.rs rename optd-datafusion-repr-adv-cost/src/{adv_cost.rs => adv_stats.rs} (66%) rename optd-datafusion-repr-adv-cost/src/{adv_cost => adv_stats}/agg.rs (76%) rename optd-datafusion-repr-adv-cost/src/{adv_cost => adv_stats}/filter.rs (98%) rename optd-datafusion-repr-adv-cost/src/{adv_cost => adv_stats}/filter/in_list.rs (97%) rename optd-datafusion-repr-adv-cost/src/{adv_cost => adv_stats}/filter/like.rs (98%) rename optd-datafusion-repr-adv-cost/src/{adv_cost => adv_stats}/join.rs (97%) rename optd-datafusion-repr-adv-cost/src/{adv_cost => adv_stats}/limit.rs (62%) rename optd-datafusion-repr-adv-cost/src/{adv_cost => adv_stats}/stats.rs (100%) delete mode 100644 optd-datafusion-repr-adv-cost/src/bin/test_optimize_adv.rs diff --git a/Cargo.lock b/Cargo.lock index 15f2e142..149d4264 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1734,6 +1734,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "erased-serde" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b73807008a3c7f171cc40312f37d95ef0396e048b5848d775f54b1a4dd4a0d3" +dependencies = [ + "serde", +] + [[package]] name = "errno" version = "0.3.8" @@ -2754,6 +2763,7 @@ dependencies = [ "ordered-float 4.2.0", "serde", "tracing", + "value-bag", ] [[package]] @@ -2788,6 +2798,7 @@ dependencies = [ "tracing", "tracing-subscriber", "union-find", + "value-bag", ] [[package]] @@ -2871,6 +2882,7 @@ dependencies = [ "mimalloc", "optd-datafusion-bridge", "optd-datafusion-repr", + "optd-datafusion-repr-adv-cost", "regex", "sqlplannertest", "tokio", @@ -3652,6 +3664,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_buf" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a86be9d6c7d34718d2ec6f56c8d6a4671d1a7357c2a6921f47fe5a3ee6056cc" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.196" @@ -3663,6 +3684,15 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "serde_fmt" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d4ddca14104cd60529e8c7f7ba71a2c8acd8f7f5cfcdc2faf97eeb7c3010a4" +dependencies = [ + "serde", +] + [[package]] name = "serde_json" version = "1.0.116" @@ -3933,6 +3963,84 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" +[[package]] +name = "sval" +version = "2.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6dc0f9830c49db20e73273ffae9b5240f63c42e515af1da1fceefb69fceafd8" + +[[package]] +name = "sval_buffer" +version = "2.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "429922f7ad43c0ef8fd7309e14d750e38899e32eb7e8da656ea169dd28ee212f" +dependencies = [ + "sval", + "sval_ref", +] + +[[package]] +name = "sval_dynamic" +version = "2.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f16ff5d839396c11a30019b659b0976348f3803db0626f736764c473b50ff4" +dependencies = [ + "sval", +] + +[[package]] +name = "sval_fmt" +version = "2.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c01c27a80b6151b0557f9ccbe89c11db571dc5f68113690c1e028d7e974bae94" +dependencies = [ + "itoa", + "ryu", + "sval", +] + +[[package]] +name = "sval_json" +version = "2.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0deef63c70da622b2a8069d8600cf4b05396459e665862e7bdb290fd6cf3f155" +dependencies = [ + "itoa", + "ryu", + "sval", +] + +[[package]] +name = "sval_nested" +version = "2.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a39ce5976ae1feb814c35d290cf7cf8cd4f045782fe1548d6bc32e21f6156e9f" +dependencies = [ + "sval", + "sval_buffer", + "sval_ref", +] + +[[package]] +name = "sval_ref" +version = "2.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb7c6ee3751795a728bc9316a092023529ffea1783499afbc5c66f5fabebb1fa" +dependencies = [ + "sval", +] + +[[package]] +name = "sval_serde" +version = "2.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a5572d0321b68109a343634e3a5d576bf131b82180c6c442dee06349dfc652a" +dependencies = [ + "serde", + "sval", + "sval_nested", +] + [[package]] name = "syn" version = "1.0.109" @@ -4481,6 +4589,43 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2" +dependencies = [ + "value-bag-serde1", + "value-bag-sval2", +] + +[[package]] +name = "value-bag-serde1" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bb773bd36fd59c7ca6e336c94454d9c66386416734817927ac93d81cb3c5b0b" +dependencies = [ + "erased-serde", + "serde", + "serde_buf", + "serde_fmt", +] + +[[package]] +name = "value-bag-sval2" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a916a702cac43a88694c97657d449775667bcd14b70419441d05b7fea4a83a" +dependencies = [ + "sval", + "sval_buffer", + "sval_dynamic", + "sval_fmt", + "sval_json", + "sval_ref", + "sval_serde", +] + [[package]] name = "version_check" version = "0.9.4" diff --git a/datafusion-optd-cli/src/main.rs b/datafusion-optd-cli/src/main.rs index 3223f00d..520884f4 100644 --- a/datafusion-optd-cli/src/main.rs +++ b/datafusion-optd-cli/src/main.rs @@ -31,7 +31,7 @@ use datafusion_optd_cli::{ use mimalloc::MiMalloc; use optd_datafusion_bridge::{DatafusionCatalog, OptdQueryPlanner}; use optd_datafusion_repr::DatafusionOptimizer; -use optd_datafusion_repr_adv_cost::adv_cost::stats::BaseTableStats; +use optd_datafusion_repr_adv_cost::adv_stats::stats::BaseTableStats; use std::collections::HashMap; use std::env; use std::path::Path; diff --git a/optd-core/Cargo.toml b/optd-core/Cargo.toml index 241ee459..9a323933 100644 --- a/optd-core/Cargo.toml +++ b/optd-core/Cargo.toml @@ -10,6 +10,7 @@ anyhow = "1" tracing = "0.1" ordered-float = "4" itertools = "0.11" -serde = {version = "1.0", features = ["derive", "rc"]} +serde = { version = "1.0", features = ["derive", "rc"] } arrow-schema = "47.0.0" chrono = "0.4" +value-bag = { version = "1", features = ["owned"] } diff --git a/optd-core/src/cascades/memo.rs b/optd-core/src/cascades/memo.rs index 364bab6b..31f1588c 100644 --- a/optd-core/src/cascades/memo.rs +++ b/optd-core/src/cascades/memo.rs @@ -3,15 +3,15 @@ use std::{ sync::Arc, }; -use anyhow::{bail, Result}; +use anyhow::{bail, Context, Result}; use itertools::Itertools; use std::any::Any; use tracing::trace; use crate::{ - cost::Cost, + cost::{Cost, Statistics}, property::PropertyBuilderAny, - rel_node::{RelNode, RelNodeMeta, RelNodeMetaMap, RelNodeRef, RelNodeTyp, Value}, + rel_node::{RelNode, RelNodeRef, RelNodeTyp, Value}, }; use super::optimizer::{ExprId, GroupId}; @@ -53,16 +53,49 @@ impl std::fmt::Display for RelMemoNode { } } -#[derive(Default, Debug, Clone)] -pub struct Winner { - pub impossible: bool, +#[derive(Debug, Clone)] +pub struct WinnerInfo { pub expr_id: ExprId, - pub cost: Cost, + pub total_weighted_cost: f64, + pub operation_weighted_cost: f64, + pub total_cost: Cost, + pub operation_cost: Cost, + pub statistics: Arc, +} + +#[derive(Debug, Clone)] +pub enum Winner { + Unknown, + Impossible, + Full(WinnerInfo), +} + +impl Winner { + pub fn has_full_winner(&self) -> bool { + matches!(self, Self::Full { .. }) + } + + pub fn has_decided(&self) -> bool { + matches!(self, Self::Full { .. } | Self::Impossible) + } + + pub fn as_full_winner(&self) -> Option<&WinnerInfo> { + match self { + Self::Full(info) => Some(info), + _ => None, + } + } +} + +impl Default for Winner { + fn default() -> Self { + Self::Unknown + } } #[derive(Default, Debug, Clone)] pub struct GroupInfo { - pub winner: Option, + pub winner: Winner, } pub(crate) struct Group { @@ -450,14 +483,17 @@ impl Memo { } pub fn update_group_info(&mut self, group_id: GroupId, group_info: GroupInfo) { - if let Some(ref winner) = group_info.winner { - if !winner.impossible { - assert!( - winner.cost.0[0] != 0.0, - "{}", - self.expr_id_to_expr_node[&winner.expr_id] - ); - } + if let Winner::Full(WinnerInfo { + total_weighted_cost, + expr_id, + .. + }) = &group_info.winner + { + assert!( + *total_weighted_cost != 0.0, + "{}", + self.expr_id_to_expr_node[expr_id] + ); } let grp = self.groups.get_mut(&group_id); grp.unwrap().info = group_info; @@ -465,14 +501,25 @@ impl Memo { /// Get all bindings of a predicate group. Will panic if the group contains more than one bindings. pub fn get_predicate_binding(&self, group_id: GroupId) -> Option> { - self.get_predicate_binding_group_inner(group_id) + self.get_predicate_binding_group_inner(group_id, true) + } + + /// Get all bindings of a predicate group. Will panic if the group contains more than one bindings. + pub fn try_get_predicate_binding(&self, group_id: GroupId) -> Option> { + self.get_predicate_binding_group_inner(group_id, false) } - fn get_predicate_binding_expr_inner(&self, expr_id: ExprId) -> Option> { + fn get_predicate_binding_expr_inner( + &self, + expr_id: ExprId, + panic_on_invalid_group: bool, + ) -> Option> { let expr = self.expr_id_to_expr_node[&expr_id].clone(); let mut children = Vec::with_capacity(expr.children.len()); for child in expr.children.iter() { - if let Some(child) = self.get_predicate_binding_group_inner(*child) { + if let Some(child) = + self.get_predicate_binding_group_inner(*child, panic_on_invalid_group) + { children.push(child); } else { return None; @@ -485,50 +532,65 @@ impl Memo { })) } - fn get_predicate_binding_group_inner(&self, group_id: GroupId) -> Option> { + fn get_predicate_binding_group_inner( + &self, + group_id: GroupId, + panic_on_invalid_group: bool, + ) -> Option> { let exprs = &self.groups[&group_id].group_exprs; match exprs.len() { 0 => None, - 1 => self.get_predicate_binding_expr_inner(*exprs.iter().next().unwrap()), - len => panic!("group {group_id} has {len} expressions"), + 1 => self.get_predicate_binding_expr_inner( + *exprs.iter().next().unwrap(), + panic_on_invalid_group, + ), + len => { + if panic_on_invalid_group { + panic!("group {group_id} has {len} expressions") + } else { + None + } + } } } pub fn get_best_group_binding( &self, group_id: GroupId, - meta: &mut Option, + mut post_process: impl FnMut(Arc>, GroupId, &WinnerInfo), + ) -> Result> { + self.get_best_group_binding_inner(group_id, &mut post_process) + } + + fn get_best_group_binding_inner( + &self, + group_id: GroupId, + post_process: &mut impl FnMut(Arc>, GroupId, &WinnerInfo), ) -> Result> { let info = self.get_group_info(group_id); - if let Some(winner) = info.winner { - if !winner.impossible { - let expr_id = winner.expr_id; - let expr = self.expr_id_to_expr_node[&expr_id].clone(); - let mut children = Vec::with_capacity(expr.children.len()); - for child in &expr.children { - children.push(self.get_best_group_binding(*child, meta)?); - } - let node = Arc::new(RelNode { - typ: expr.typ.clone(), - children, - data: expr.data.clone(), - }); - - if let Some(meta) = meta { - meta.insert( - node.as_ref() as *const _ as usize, - RelNodeMeta::new(group_id, winner.cost), - ); - } - return Ok(node); + if let Winner::Full(info @ WinnerInfo { expr_id, .. }) = info.winner { + let expr = self.expr_id_to_expr_node[&expr_id].clone(); + let mut children = Vec::with_capacity(expr.children.len()); + for child in &expr.children { + children.push( + self.get_best_group_binding_inner(*child, post_process) + .with_context(|| format!("when processing expr {}", expr_id))?, + ); } + let node = Arc::new(RelNode { + typ: expr.typ.clone(), + children, + data: expr.data.clone(), + }); + post_process(node.clone(), group_id, &info); + return Ok(node); } bail!("no best group binding for group {}", group_id) } pub fn clear_winner(&mut self) { for group in self.groups.values_mut() { - group.info.winner = None; + group.info.winner = Winner::Unknown; } } diff --git a/optd-core/src/cascades/optimizer.rs b/optd-core/src/cascades/optimizer.rs index 88394c4b..d5afc6da 100644 --- a/optd-core/src/cascades/optimizer.rs +++ b/optd-core/src/cascades/optimizer.rs @@ -8,10 +8,11 @@ use anyhow::Result; use tracing::trace; use crate::{ + cascades::memo::Winner, cost::CostModel, optimizer::Optimizer, property::{PropertyBuilder, PropertyBuilderAny}, - rel_node::{RelNodeMetaMap, RelNodeRef, RelNodeTyp}, + rel_node::{RelNodeMeta, RelNodeMetaMap, RelNodeRef, RelNodeTyp}, rules::RuleWrapper, }; @@ -137,46 +138,24 @@ impl CascadesOptimizer { self.disabled_rules.contains(&rule_id) } - pub fn dump(&self, group_id: Option) { - if let Some(group_id) = group_id { - fn dump_inner(this: &CascadesOptimizer, group_id: GroupId) { - if let Some(ref winner) = this.memo.get_group_info(group_id).winner { - let expr = this.memo.get_expr_memoed(winner.expr_id); - assert!(!winner.impossible); - if winner.cost.0[1] == 1.0 { - return; - } - println!( - "group_id={} winner={} cost={} {}", - group_id, - winner.expr_id, - this.cost.explain(&winner.cost), - expr - ); - for child in &expr.children { - dump_inner(this, *child); - } - } - } - dump_inner(self, group_id); - return; - } + pub fn dump(&self) { for group_id in self.memo.get_all_group_ids() { - let winner = if let Some(ref winner) = self.memo.get_group_info(group_id).winner { - if winner.impossible { - "winner=".to_string() - } else { + let winner_str = match self.memo.get_group_info(group_id).winner { + Winner::Impossible => "winner=".to_string(), + Winner::Unknown => "winner=".to_string(), + Winner::Full(winner) => { + let expr = self.memo.get_expr_memoed(winner.expr_id); format!( - "winner={} cost={} {}", + "winner={} weighted_cost={} cost={} stat={} | {}", winner.expr_id, - self.cost.explain(&winner.cost), - self.memo.get_expr_memoed(winner.expr_id) + winner.total_weighted_cost, + self.cost.explain_cost(&winner.total_cost), + self.cost.explain_statistics(&winner.statistics), + expr ) } - } else { - "winner=None".to_string() }; - println!("group_id={} {}", group_id, winner); + println!("group_id={} {}", group_id, winner_str); let group = self.memo.get_group(group_id); for (id, property) in self.property_builders.iter().enumerate() { println!( @@ -185,16 +164,12 @@ impl CascadesOptimizer { property.display(group.properties[id].as_ref()) ) } + if let Some(predicate_binding) = self.memo.try_get_predicate_binding(group_id) { + println!(" predicate={}", predicate_binding); + } for expr_id in self.memo.get_all_exprs_in_group(group_id) { let memo_node = self.memo.get_expr_memoed(expr_id); println!(" expr_id={} | {}", expr_id, memo_node); - // We removed get all bindings functionality - // let bindings = self - // .memo - // .get_all_expr_bindings(expr_id, false, true, Some(1)); - // for binding in bindings { - // println!(" {}", binding); - // } } } } @@ -226,7 +201,26 @@ impl CascadesOptimizer { group_id: GroupId, meta: &mut Option, ) -> Result> { - self.memo.get_best_group_binding(group_id, meta) + let res = self + .memo + .get_best_group_binding(group_id, |node, group_id, info| { + if let Some(meta) = meta { + let node = node.as_ref() as *const _ as usize; + let node_meta = RelNodeMeta::new( + group_id, + info.total_weighted_cost, + info.total_cost.clone(), + info.statistics.clone(), + self.cost.explain_cost(&info.total_cost), + self.cost.explain_statistics(&info.statistics), + ); + meta.insert(node, node_meta); + } + }); + if res.is_err() && cfg!(debug_assertions) { + self.dump(); + } + res } fn fire_optimize_tasks(&mut self, group_id: GroupId) -> Result<()> { @@ -268,16 +262,13 @@ impl CascadesOptimizer { } } } - // if self.ctx.budget_used { - // self.dump(None); - // } Ok(()) } fn optimize_inner(&mut self, root_rel: RelNodeRef) -> Result> { let (group_id, _) = self.add_new_expr(root_rel); self.fire_optimize_tasks(group_id)?; - self.memo.get_best_group_binding(group_id, &mut None) + self.memo.get_best_group_binding(group_id, |_, _, _| {}) } pub fn resolve_group_id(&self, root_rel: RelNodeRef) -> GroupId { @@ -377,15 +368,6 @@ impl CascadesOptimizer { .insert(rule_id); } - pub fn get_cost_of(&self, group_id: GroupId) -> f64 { - self.memo - .get_group_info(group_id) - .winner - .as_ref() - .map(|x| x.cost.0[0]) - .unwrap_or(0.0) - } - pub fn memo(&self) -> &Memo { &self.memo } diff --git a/optd-core/src/cascades/tasks/optimize_group.rs b/optd-core/src/cascades/tasks/optimize_group.rs index c10c7cb9..14adf725 100644 --- a/optd-core/src/cascades/tasks/optimize_group.rs +++ b/optd-core/src/cascades/tasks/optimize_group.rs @@ -26,7 +26,7 @@ impl Task for OptimizeGroupTask { fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { trace!(event = "task_begin", task = "optimize_group", group_id = %self.group_id); let group_info = optimizer.get_group_info(self.group_id); - if group_info.winner.is_some() { + if group_info.winner.has_decided() { trace!(event = "task_finish", task = "optimize_group"); return Ok(vec![]); } diff --git a/optd-core/src/cascades/tasks/optimize_inputs.rs b/optd-core/src/cascades/tasks/optimize_inputs.rs index b6ec1d82..f84ab647 100644 --- a/optd-core/src/cascades/tasks/optimize_inputs.rs +++ b/optd-core/src/cascades/tasks/optimize_inputs.rs @@ -3,12 +3,12 @@ use tracing::trace; use crate::{ cascades::{ - memo::{GroupInfo, Winner}, + memo::{GroupInfo, Winner, WinnerInfo}, optimizer::ExprId, tasks::OptimizeGroupTask, - CascadesOptimizer, GroupId, RelNodeContext, + CascadesOptimizer, RelNodeContext, }, - cost::Cost, + cost::{Cost, Statistics}, rel_node::RelNodeTyp, }; @@ -17,7 +17,6 @@ use super::Task; #[derive(Debug, Clone)] struct ContinueTask { next_group_idx: usize, - input_cost: Vec, return_from_optimize_group: bool, } @@ -44,71 +43,64 @@ impl OptimizeInputsTask { } } - /// first invoke of this task, compute the cost of children - fn first_invoke( - &self, - children: &[GroupId], - optimizer: &mut CascadesOptimizer, - ) -> Vec { - let zero_cost = optimizer.cost().zero(); - let mut input_cost = Vec::with_capacity(children.len()); - for &child in children.iter() { - let group = optimizer.get_group_info(child); - if let Some(ref winner) = group.winner { - if !winner.impossible { - // the full winner case - input_cost.push(winner.cost.clone()); - continue; - } - } - input_cost.push(zero_cost.clone()); - } - input_cost - } - - fn should_terminate(&self, cost_so_far: f64, upper_bound: Option) -> bool { - if !self.pruning { - return false; - } - if upper_bound.is_none() { - return false; - } - let upper_bound = upper_bound.unwrap(); - if cost_so_far >= upper_bound { - trace!( - event = "optimize_inputs_pruning", - task = "optimize_inputs_pruning", - cost_so_far = cost_so_far, - upper_bound = upper_bound + fn update_winner_impossible(&self, optimizer: &mut CascadesOptimizer) { + let group_id = optimizer.get_group_id(self.expr_id); + if let Winner::Unknown = optimizer.get_group_info(group_id).winner { + optimizer.update_group_info( + group_id, + GroupInfo { + winner: Winner::Impossible, + }, ); - return true; } - false } fn update_winner( &self, - cost_so_far: &Cost, + input_statistics: Vec>, + operation_cost: Cost, + total_cost: Cost, optimizer: &mut CascadesOptimizer, ) { let group_id = optimizer.get_group_id(self.expr_id); let group_info = optimizer.get_group_info(group_id); + let cost = optimizer.cost(); + let operation_weighted_cost = cost.weighted_cost(&operation_cost); + let total_weighted_cost = cost.weighted_cost(&total_cost); let mut update_cost = false; - if let Some(ref winner) = group_info.winner { - if winner.impossible || &winner.cost > cost_so_far { + if let Some(winner) = group_info.winner.as_full_winner() { + if winner.total_weighted_cost > total_weighted_cost { update_cost = true; } } else { update_cost = true; } if update_cost { + let expr = optimizer.get_expr_memoed(self.expr_id); + let statistics = cost.derive_statistics( + &expr.typ, + &expr.data, + &input_statistics + .iter() + .map(|x| x.expect("child winner should always have statistics?")) + .collect::>(), + Some(RelNodeContext { + group_id, + expr_id: self.expr_id, + children_group_ids: expr.children.clone(), + }), + Some(optimizer), + ); optimizer.update_group_info( group_id, GroupInfo { - winner: Some(Winner { - impossible: false, + winner: Winner::Full(WinnerInfo { expr_id: self.expr_id, - cost: cost_so_far.clone(), + total_weighted_cost, + operation_weighted_cost, + total_cost, + operation_cost, + statistics: statistics.into(), }), }, ); @@ -127,15 +119,15 @@ impl Task for OptimizeInputsTask { } optimizer.mark_expr_explored(self.expr_id); } - trace!(event = "task_begin", task = "optimize_inputs", expr_id = %self.expr_id, continue_from = ?self.continue_from); let expr = optimizer.get_expr_memoed(self.expr_id); let group_id = optimizer.get_group_id(self.expr_id); let children_group_ids = &expr.children; let cost = optimizer.cost(); + trace!(event = "task_begin", task = "optimize_inputs", expr_id = %self.expr_id, continue_from = ?self.continue_from, total_children = %children_group_ids.len()); + if let Some(ContinueTask { next_group_idx, - mut input_cost, return_from_optimize_group, }) = self.continue_from.clone() { @@ -144,127 +136,81 @@ impl Task for OptimizeInputsTask { group_id, children_group_ids: children_group_ids.clone(), }; - if self.should_terminate( - cost.sum( - &cost.compute_cost( - &expr.typ, - &expr.data, - &input_cost, - Some(context.clone()), - Some(optimizer), - ), - &input_cost, - ) - .0[0], - optimizer.ctx.upper_bound, - ) { - trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id); - return Ok(vec![]); - } + let input_statistics = children_group_ids + .iter() + .map(|&group_id| { + optimizer + .get_group_info(group_id) + .winner + .as_full_winner() + .map(|x| x.statistics.clone()) + }) + .collect::>(); + let input_statistics_ref = input_statistics + .iter() + .map(|x| x.as_deref()) + .collect::>(); + let input_cost = children_group_ids + .iter() + .map(|&group_id| { + optimizer + .get_group_info(group_id) + .winner + .as_full_winner() + .map(|x| x.total_cost.clone()) + .unwrap_or_else(|| cost.zero()) + }) + .collect::>(); + let operation_cost = cost.compute_operation_cost( + &expr.typ, + &expr.data, + &input_statistics_ref, + &input_cost, + Some(context.clone()), + Some(optimizer), + ); + let total_cost = cost.sum(&operation_cost, &input_cost); if next_group_idx < children_group_ids.len() { - let group_id = children_group_ids[next_group_idx]; + let child_group_id = children_group_ids[next_group_idx]; let group_idx = next_group_idx; - let group_info = optimizer.get_group_info(group_id); - let mut has_full_winner = false; - if let Some(ref winner) = group_info.winner { - if !winner.impossible { - input_cost[group_idx] = winner.cost.clone(); - has_full_winner = true; - if self.should_terminate( - cost.sum( - &cost.compute_cost( - &expr.typ, - &expr.data, - &input_cost, - Some(context.clone()), - Some(optimizer), - ), - &input_cost, - ) - .0[0], - optimizer.ctx.upper_bound, - ) { - trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id); - return Ok(vec![]); - } - } - } - if !has_full_winner { + let group_info = optimizer.get_group_info(child_group_id); + if !group_info.winner.has_full_winner() { if !return_from_optimize_group { - trace!(event = "task_yield", task = "optimize_inputs", expr_id = %self.expr_id, group_idx = %group_idx); + trace!(event = "task_yield", task = "optimize_inputs", expr_id = %self.expr_id, group_idx = %group_idx, yield_to = "optimize_group", optimize_group_id = %child_group_id); return Ok(vec![ Box::new(self.continue_from( ContinueTask { next_group_idx, - input_cost, return_from_optimize_group: true, }, self.pruning, )) as Box>, - Box::new(OptimizeGroupTask::new(group_id)) as Box>, + Box::new(OptimizeGroupTask::new(child_group_id)) as Box>, ]); } else { - if let Some(ref winner) = group_info.winner { - if winner.impossible { - optimizer.update_group_info( - group_id, - GroupInfo { - winner: Some(Winner { - impossible: true, - ..Default::default() - }), - }, - ); - trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id); - return Ok(vec![]); - } - } - optimizer.update_group_info( - group_id, - GroupInfo { - winner: Some(Winner { - impossible: true, - ..Default::default() - }), - }, - ); - trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id); + self.update_winner_impossible(optimizer); + trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id, "result" = "impossible"); return Ok(vec![]); } } - trace!(event = "task_yield", task = "optimize_inputs", expr_id = %self.expr_id, group_idx = %group_idx); + trace!(event = "task_yield", task = "optimize_inputs", expr_id = %self.expr_id, group_idx = %group_idx, yield_to = "next_optimize_input"); Ok(vec![Box::new(self.continue_from( ContinueTask { next_group_idx: group_idx + 1, - input_cost, return_from_optimize_group: false, }, self.pruning, )) as Box>]) } else { - self.update_winner( - &cost.sum( - &cost.compute_cost( - &expr.typ, - &expr.data, - &input_cost, - Some(context.clone()), - Some(optimizer), - ), - &input_cost, - ), - optimizer, - ); - trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id); + self.update_winner(input_statistics_ref, operation_cost, total_cost, optimizer); + trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id, "result" = "optimized"); Ok(vec![]) } } else { - let input_cost = self.first_invoke(children_group_ids, optimizer); trace!(event = "task_yield", task = "optimize_inputs", expr_id = %self.expr_id); Ok(vec![Box::new(self.continue_from( ContinueTask { next_group_idx: 0, - input_cost, return_from_optimize_group: false, }, self.pruning, diff --git a/optd-core/src/cost.rs b/optd-core/src/cost.rs index 8d387a61..eda1d389 100644 --- a/optd-core/src/cost.rs +++ b/optd-core/src/cost.rs @@ -1,35 +1,57 @@ use crate::{ cascades::{CascadesOptimizer, RelNodeContext}, - rel_node::{RelNode, RelNodeTyp, Value}, + rel_node::{RelNodeTyp, Value}, }; +/// The statistics of a group. +#[derive(Clone, Debug)] +pub struct Statistics(pub value_bag::OwnedValueBag); + +/// The cost of an operation. The cost is represented as a vector of double values. +/// For example, it can be represented as `[compute_cost, io_cost]`. +/// A lower value means a better cost. #[derive(Default, Clone, Debug, PartialOrd, PartialEq)] pub struct Cost(pub Vec); pub trait CostModel: 'static + Send + Sync { - fn compute_cost( + /// Compute the cost of a single operation + fn compute_operation_cost( &self, node: &T, data: &Option, - children: &[Cost], + children: &[Option<&Statistics>], + children_cost: &[Cost], context: Option, - // one reason we need the optimizer is to traverse children nodes to build up an expression tree optimizer: Option<&CascadesOptimizer>, ) -> Cost; - fn compute_plan_node_cost(&self, node: &RelNode) -> Cost; + /// Derive the statistics of a single operation + fn derive_statistics( + &self, + node: &T, + data: &Option, + children: &[&Statistics], + context: Option, + optimizer: Option<&CascadesOptimizer>, + ) -> Statistics; + + fn explain_cost(&self, cost: &Cost) -> String; - fn explain(&self, cost: &Cost) -> String; + fn explain_statistics(&self, cost: &Statistics) -> String; fn accumulate(&self, total_cost: &mut Cost, cost: &Cost); - fn sum(&self, self_cost: &Cost, inputs: &[Cost]) -> Cost { - let mut total_cost = self_cost.clone(); - for input in inputs { + fn sum(&self, operation_cost: &Cost, inputs_cost: &[Cost]) -> Cost { + let mut total_cost = operation_cost.clone(); + for input in inputs_cost { self.accumulate(&mut total_cost, input); } total_cost } + /// The zero cost. fn zero(&self) -> Cost; + + /// The weighted cost of a compound cost. + fn weighted_cost(&self, cost: &Cost) -> f64; } diff --git a/optd-core/src/rel_node.rs b/optd-core/src/rel_node.rs index 9511d11e..8af2fc2c 100644 --- a/optd-core/src/rel_node.rs +++ b/optd-core/src/rel_node.rs @@ -13,7 +13,10 @@ use chrono::NaiveDate; use ordered_float::OrderedFloat; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use crate::{cascades::GroupId, cost::Cost}; +use crate::{ + cascades::GroupId, + cost::{Cost, Statistics}, +}; pub type RelNodeRef = Arc>; @@ -264,17 +267,41 @@ impl RelNode { } /// Metadata for a rel node. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug)] pub struct RelNodeMeta { /// The group (id) of the `RelNode` pub group_id: GroupId, + /// Weighted cost of the `RelNode` + pub weighted_cost: f64, /// Cost of the `RelNode` pub cost: Cost, + /// Statistics + pub stat: Arc, + /// Cost in display string + /// TODO: this should be lazily processed and generated + pub cost_display: String, + /// Statistics in display string + /// TODO: this should be lazily processed and generated + pub stat_display: String, } impl RelNodeMeta { - pub fn new(group_id: GroupId, cost: Cost) -> Self { - RelNodeMeta { group_id, cost } + pub fn new( + group_id: GroupId, + weighted_cost: f64, + cost: Cost, + stat: Arc, + cost_display: String, + stat_display: String, + ) -> Self { + RelNodeMeta { + group_id, + weighted_cost, + cost, + stat, + cost_display, + stat_display, + } } } diff --git a/optd-datafusion-repr-adv-cost/src/adaptive_cost.rs b/optd-datafusion-repr-adv-cost/src/adaptive_cost.rs deleted file mode 100644 index 8841cac1..00000000 --- a/optd-datafusion-repr-adv-cost/src/adaptive_cost.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::sync::{Arc, Mutex}; - -use crate::adv_cost::OptCostModel; -use optd_core::{ - cascades::{CascadesOptimizer, RelNodeContext}, - cost::{Cost, CostModel}, - rel_node::{RelNode, Value}, -}; -use optd_datafusion_repr::{ - cost::adaptive_cost::RuntimeAdaptionStorageInner, plan_nodes::OptRelNodeTyp, -}; -use serde::{de::DeserializeOwned, Serialize}; - -use super::adv_cost::stats::{ - BaseTableStats, DataFusionDistribution, DataFusionMostCommonValues, Distribution, - MostCommonValues, -}; - -pub type RuntimeAdaptionStorage = Arc>; -pub type DataFusionAdaptiveCostModel = - AdaptiveCostModel; - -pub const DEFAULT_DECAY: usize = 50; - -pub struct AdaptiveCostModel< - M: MostCommonValues + Serialize + DeserializeOwned, - D: Distribution + Serialize + DeserializeOwned, -> { - runtime_row_cnt: RuntimeAdaptionStorage, - base_model: OptCostModel, - decay: usize, -} - -impl< - M: MostCommonValues + Serialize + DeserializeOwned, - D: Distribution + Serialize + DeserializeOwned, - > CostModel for AdaptiveCostModel -{ - fn explain(&self, cost: &Cost) -> String { - self.base_model.explain(cost) - } - - fn accumulate(&self, total_cost: &mut Cost, cost: &Cost) { - self.base_model.accumulate(total_cost, cost) - } - - fn zero(&self) -> Cost { - self.base_model.zero() - } - - fn compute_cost( - &self, - node: &OptRelNodeTyp, - data: &Option, - children: &[Cost], - context: Option, - optimizer: Option<&CascadesOptimizer>, - ) -> Cost { - if let OptRelNodeTyp::PhysicalScan = node { - let guard = self.runtime_row_cnt.lock().unwrap(); - if let Some((runtime_row_cnt, iter)) = - guard.history.get(&context.as_ref().unwrap().group_id) - { - if *iter + self.decay >= guard.iter_cnt { - let runtime_row_cnt = (*runtime_row_cnt).max(1) as f64; - return OptCostModel::::cost(runtime_row_cnt, 0.0, runtime_row_cnt); - } - } - } - let (mut row_cnt, compute_cost, io_cost) = OptCostModel::::cost_tuple( - &self - .base_model - .compute_cost(node, data, children, context.clone(), optimizer), - ); - if let Some(context) = context { - let guard = self.runtime_row_cnt.lock().unwrap(); - if let Some((runtime_row_cnt, iter)) = guard.history.get(&context.group_id) { - if *iter + self.decay >= guard.iter_cnt { - let runtime_row_cnt = (*runtime_row_cnt).max(1) as f64; - row_cnt = runtime_row_cnt; - } - } - } - OptCostModel::::cost(row_cnt, compute_cost, io_cost) - } - - fn compute_plan_node_cost(&self, node: &RelNode) -> Cost { - self.base_model.compute_plan_node_cost(node) - } -} - -impl< - M: MostCommonValues + Serialize + DeserializeOwned, - D: Distribution + Serialize + DeserializeOwned, - > AdaptiveCostModel -{ - pub fn new(decay: usize, stats: BaseTableStats) -> Self { - Self { - runtime_row_cnt: RuntimeAdaptionStorage::default(), - base_model: OptCostModel::new(stats), - decay, - } - } - - pub fn get_runtime_map(&self) -> RuntimeAdaptionStorage { - self.runtime_row_cnt.clone() - } -} diff --git a/optd-datafusion-repr-adv-cost/src/adv_cost.rs b/optd-datafusion-repr-adv-cost/src/adv_stats.rs similarity index 66% rename from optd-datafusion-repr-adv-cost/src/adv_cost.rs rename to optd-datafusion-repr-adv-cost/src/adv_stats.rs index bddd0ef3..c8c287d3 100644 --- a/optd-datafusion-repr-adv-cost/src/adv_cost.rs +++ b/optd-datafusion-repr-adv-cost/src/adv_stats.rs @@ -4,42 +4,18 @@ mod join; mod limit; pub mod stats; -use itertools::Itertools; -use optd_core::{ - cascades::{CascadesOptimizer, RelNodeContext}, - cost::{Cost, CostModel}, - rel_node::{RelNode, RelNodeTyp, Value}, -}; -use optd_datafusion_repr::{ - plan_nodes::OptRelNodeTyp, - properties::column_ref::{BaseTableColumnRef, ColumnRef}, -}; +use optd_datafusion_repr::properties::column_ref::{BaseTableColumnRef, ColumnRef}; use serde::{de::DeserializeOwned, Serialize}; -use super::adv_cost::stats::{ +use super::adv_stats::stats::{ BaseTableStats, ColumnCombValueStats, Distribution, MostCommonValues, }; -fn compute_plan_node_cost>( - model: &C, - node: &RelNode, - total_cost: &mut Cost, -) -> Cost { - let children = node - .children - .iter() - .map(|child| compute_plan_node_cost(model, child, total_cost)) - .collect_vec(); - let cost = model.compute_cost(&node.typ, &node.data, &children, None, None); - model.accumulate(total_cost, &cost); - cost -} - -pub struct OptCostModel< +pub struct AdvStats< M: MostCommonValues + Serialize + DeserializeOwned, D: Distribution + Serialize + DeserializeOwned, > { - per_table_stats_map: BaseTableStats, + pub(crate) per_table_stats_map: BaseTableStats, } // Default statistics. All are from selfuncs.h in Postgres unless specified otherwise @@ -55,150 +31,10 @@ const DEFAULT_UNK_SEL: f64 = 0.005; // A placeholder for unimplemented!() for codepaths which are accessed by plannertest const UNIMPLEMENTED_SEL: f64 = 0.01; -pub const ROW_COUNT: usize = 1; -pub const COMPUTE_COST: usize = 2; -pub const IO_COST: usize = 3; - -impl< - M: MostCommonValues + Serialize + DeserializeOwned, - D: Distribution + Serialize + DeserializeOwned, - > OptCostModel -{ - pub fn row_cnt(Cost(cost): &Cost) -> f64 { - cost[ROW_COUNT] - } - - pub fn compute_cost(Cost(cost): &Cost) -> f64 { - cost[COMPUTE_COST] - } - - pub fn io_cost(Cost(cost): &Cost) -> f64 { - cost[IO_COST] - } - - pub fn cost_tuple(Cost(cost): &Cost) -> (f64, f64, f64) { - (cost[ROW_COUNT], cost[COMPUTE_COST], cost[IO_COST]) - } - - pub fn weighted_cost(row_cnt: f64, compute_cost: f64, io_cost: f64) -> f64 { - let _ = row_cnt; - compute_cost + io_cost - } - - pub fn cost(row_cnt: f64, compute_cost: f64, io_cost: f64) -> Cost { - Cost(vec![ - Self::weighted_cost(row_cnt, compute_cost, io_cost), - row_cnt, - compute_cost, - io_cost, - ]) - } -} - -impl< - M: MostCommonValues + Serialize + DeserializeOwned, - D: Distribution + Serialize + DeserializeOwned, - > CostModel for OptCostModel -{ - fn explain(&self, cost: &Cost) -> String { - format!( - "weighted={},row_cnt={},compute={},io={}", - cost.0[0], - Self::row_cnt(cost), - Self::compute_cost(cost), - Self::io_cost(cost) - ) - } - - fn accumulate(&self, total_cost: &mut Cost, cost: &Cost) { - // do not accumulate row count - total_cost.0[COMPUTE_COST] += Self::compute_cost(cost); - total_cost.0[IO_COST] += Self::io_cost(cost); - total_cost.0[0] = Self::weighted_cost( - total_cost.0[ROW_COUNT], - total_cost.0[COMPUTE_COST], - total_cost.0[IO_COST], - ); - } - - fn zero(&self) -> Cost { - Self::cost(0.0, 0.0, 0.0) - } - - fn compute_cost( - &self, - node: &OptRelNodeTyp, - data: &Option, - children: &[Cost], - context: Option, - optimizer: Option<&CascadesOptimizer>, - ) -> Cost { - match node { - OptRelNodeTyp::PhysicalScan => { - let table = data.as_ref().unwrap().as_str(); - let row_cnt = self - .per_table_stats_map - .get(table.as_ref()) - .map(|per_table_stats| per_table_stats.row_cnt) - .unwrap_or(1) as f64; - Self::cost(row_cnt, 0.0, row_cnt) - } - OptRelNodeTyp::PhysicalEmptyRelation => Self::cost(0.5, 0.01, 0.0), - OptRelNodeTyp::PhysicalLimit => Self::get_limit_cost(children, context, optimizer), - OptRelNodeTyp::PhysicalFilter => self.get_filter_cost(children, context, optimizer), - OptRelNodeTyp::PhysicalNestedLoopJoin(join_typ) => { - self.get_nlj_cost(*join_typ, children, context, optimizer) - } - OptRelNodeTyp::PhysicalProjection => { - let (row_cnt, _, _) = Self::cost_tuple(&children[0]); - let (_, compute_cost, _) = Self::cost_tuple(&children[1]); - Self::cost(row_cnt, compute_cost * row_cnt, 0.0) - } - OptRelNodeTyp::PhysicalHashJoin(join_typ) => { - self.get_hash_join_cost(*join_typ, children, context, optimizer) - } - OptRelNodeTyp::PhysicalSort => { - let (row_cnt, _, _) = Self::cost_tuple(&children[0]); - Self::cost(row_cnt, row_cnt * row_cnt.ln_1p().max(1.0), 0.0) - } - OptRelNodeTyp::PhysicalAgg => self.get_agg_cost(children, context, optimizer), - OptRelNodeTyp::List => { - let compute_cost = children - .iter() - .map(|child| { - let (_, compute_cost, _) = Self::cost_tuple(child); - compute_cost - }) - .sum::(); - Self::cost(1.0, compute_cost + 0.01, 0.0) - } - OptRelNodeTyp::ColumnRef => Self::cost(1.0, 0.01, 0.0), - _ if node.is_expression() => { - let compute_cost = children - .iter() - .map(|child| { - let (_, compute_cost, _) = Self::cost_tuple(child); - compute_cost - }) - .sum::(); - Self::cost(1.0, compute_cost + 1.0, 0.0) - } - x => unimplemented!("cannot compute cost for {}", x), - } - } - - fn compute_plan_node_cost(&self, node: &RelNode) -> Cost { - let mut cost = self.zero(); - let top = compute_plan_node_cost(self, node, &mut cost); - cost.0[ROW_COUNT] = top.0[ROW_COUNT]; - cost - } -} - impl< M: MostCommonValues + Serialize + DeserializeOwned, D: Distribution + Serialize + DeserializeOwned, - > OptCostModel + > AdvStats { pub fn new(per_table_stats_map: BaseTableStats) -> Self { Self { @@ -245,7 +81,7 @@ mod tests { use super::{stats::*, *}; pub type TestPerColumnStats = ColumnCombValueStats; - pub type TestOptCostModel = OptCostModel; + pub type TestOptCostModel = AdvStats; #[derive(Serialize, Deserialize)] pub struct TestMostCommonValues { @@ -319,7 +155,7 @@ mod tests { // one column is sufficient for all filter selectivity tests pub fn create_one_column_cost_model(per_column_stats: TestPerColumnStats) -> TestOptCostModel { - OptCostModel::new( + AdvStats::new( vec![( String::from(TABLE1_NAME), TableStats::new(100, vec![(vec![0], per_column_stats)].into_iter().collect()), @@ -348,7 +184,7 @@ mod tests { tbl2_per_column_stats: TestPerColumnStats, tbl3_per_column_stats: TestPerColumnStats, ) -> TestOptCostModel { - OptCostModel::new( + AdvStats::new( vec![ ( String::from(TABLE1_NAME), @@ -384,7 +220,7 @@ mod tests { tbl3_per_column_stats: TestPerColumnStats, tbl4_per_column_stats: TestPerColumnStats, ) -> TestOptCostModel { - OptCostModel::new( + AdvStats::new( vec![ ( String::from(TABLE1_NAME), @@ -427,7 +263,7 @@ mod tests { tbl1_row_cnt: usize, tbl2_row_cnt: usize, ) -> TestOptCostModel { - OptCostModel::new( + AdvStats::new( vec![ ( String::from(TABLE1_NAME), @@ -523,7 +359,7 @@ mod tests { /// The reason this isn't an associated function of PerColumnStats is because that would require /// adding an empty() function to the trait definitions of MostCommonValues and Distribution, /// which I wanted to avoid - pub fn get_empty_per_col_stats() -> TestPerColumnStats { + pub(crate) fn get_empty_per_col_stats() -> TestPerColumnStats { TestPerColumnStats::new( TestMostCommonValues::empty(), 0, diff --git a/optd-datafusion-repr-adv-cost/src/adv_cost/agg.rs b/optd-datafusion-repr-adv-cost/src/adv_stats/agg.rs similarity index 76% rename from optd-datafusion-repr-adv-cost/src/adv_cost/agg.rs rename to optd-datafusion-repr-adv-cost/src/adv_stats/agg.rs index e2d99fc6..fc8adb14 100644 --- a/optd-datafusion-repr-adv-cost/src/adv_cost/agg.rs +++ b/optd-datafusion-repr-adv-cost/src/adv_stats/agg.rs @@ -1,10 +1,7 @@ -use optd_core::{ - cascades::{CascadesOptimizer, RelNodeContext}, - cost::Cost, -}; +use optd_core::cascades::{CascadesOptimizer, RelNodeContext}; use serde::{de::DeserializeOwned, Serialize}; -use crate::adv_cost::{ +use crate::adv_stats::{ stats::{Distribution, MostCommonValues}, DEFAULT_NUM_DISTINCT, }; @@ -13,31 +10,14 @@ use optd_datafusion_repr::{ properties::column_ref::{BaseTableColumnRef, ColumnRef, ColumnRefPropertyBuilder}, }; -use super::{OptCostModel, DEFAULT_UNK_SEL}; +use super::{AdvStats, DEFAULT_UNK_SEL}; impl< M: MostCommonValues + Serialize + DeserializeOwned, D: Distribution + Serialize + DeserializeOwned, - > OptCostModel + > AdvStats { - pub(super) fn get_agg_cost( - &self, - children: &[Cost], - context: Option, - optimizer: Option<&CascadesOptimizer>, - ) -> Cost { - let child_row_cnt = Self::row_cnt(&children[0]); - let row_cnt = self.get_agg_row_cnt(context, optimizer, child_row_cnt); - let (_, compute_cost_1, _) = Self::cost_tuple(&children[1]); - let (_, compute_cost_2, _) = Self::cost_tuple(&children[2]); - Self::cost( - row_cnt, - child_row_cnt * (compute_cost_1 + compute_cost_2), - 0.0, - ) - } - - fn get_agg_row_cnt( + pub(crate) fn get_agg_row_cnt( &self, context: Option, optimizer: Option<&CascadesOptimizer>, diff --git a/optd-datafusion-repr-adv-cost/src/adv_cost/filter.rs b/optd-datafusion-repr-adv-cost/src/adv_stats/filter.rs similarity index 98% rename from optd-datafusion-repr-adv-cost/src/adv_cost/filter.rs rename to optd-datafusion-repr-adv-cost/src/adv_stats/filter.rs index 2365714e..f64fef23 100644 --- a/optd-datafusion-repr-adv-cost/src/adv_cost/filter.rs +++ b/optd-datafusion-repr-adv-cost/src/adv_stats/filter.rs @@ -2,12 +2,11 @@ use std::ops::Bound; use optd_core::{ cascades::{CascadesOptimizer, RelNodeContext}, - cost::Cost, rel_node::Value, }; use serde::{de::DeserializeOwned, Serialize}; -use crate::adv_cost::{ +use crate::adv_stats::{ stats::{ColumnCombValueStats, Distribution, MostCommonValues}, UNIMPLEMENTED_SEL, }; @@ -24,9 +23,7 @@ use optd_datafusion_repr::{ }, }; -use super::{ - stats::ColumnCombValue, OptCostModel, DEFAULT_EQ_SEL, DEFAULT_INEQ_SEL, DEFAULT_UNK_SEL, -}; +use super::{stats::ColumnCombValue, AdvStats, DEFAULT_EQ_SEL, DEFAULT_INEQ_SEL, DEFAULT_UNK_SEL}; mod in_list; mod like; @@ -34,16 +31,14 @@ mod like; impl< M: MostCommonValues + Serialize + DeserializeOwned, D: Distribution + Serialize + DeserializeOwned, - > OptCostModel + > AdvStats { - pub(super) fn get_filter_cost( + pub(crate) fn get_filter_row_cnt( &self, - children: &[Cost], + child_row_cnt: f64, context: Option, optimizer: Option<&CascadesOptimizer>, - ) -> Cost { - let (row_cnt, _, _) = Self::cost_tuple(&children[0]); - let (_, compute_cost, _) = Self::cost_tuple(&children[1]); + ) -> f64 { let selectivity = if let (Some(context), Some(optimizer)) = (context, optimizer) { let schema = optimizer.get_property_by_group::(context.group_id, 0); @@ -58,11 +53,7 @@ impl< } else { DEFAULT_UNK_SEL }; - Self::cost( - (row_cnt * selectivity).max(1.0), - row_cnt * compute_cost, - 0.0, - ) + (child_row_cnt * selectivity).max(1.0) } /// The expr_tree input must be a "mixed expression tree". @@ -570,7 +561,7 @@ mod tests { use arrow_schema::DataType; use optd_core::rel_node::Value; - use crate::adv_cost::{tests::*, DEFAULT_EQ_SEL}; + use crate::adv_stats::{tests::*, DEFAULT_EQ_SEL}; use optd_datafusion_repr::{ plan_nodes::{BinOpType, ConstantType, LogOpType, UnOpType}, properties::{ diff --git a/optd-datafusion-repr-adv-cost/src/adv_cost/filter/in_list.rs b/optd-datafusion-repr-adv-cost/src/adv_stats/filter/in_list.rs similarity index 97% rename from optd-datafusion-repr-adv-cost/src/adv_cost/filter/in_list.rs rename to optd-datafusion-repr-adv-cost/src/adv_stats/filter/in_list.rs index da714257..248ae324 100644 --- a/optd-datafusion-repr-adv-cost/src/adv_cost/filter/in_list.rs +++ b/optd-datafusion-repr-adv-cost/src/adv_stats/filter/in_list.rs @@ -1,8 +1,8 @@ use serde::{de::DeserializeOwned, Serialize}; -use crate::adv_cost::{ +use crate::adv_stats::{ stats::{Distribution, MostCommonValues}, - OptCostModel, UNIMPLEMENTED_SEL, + AdvStats, UNIMPLEMENTED_SEL, }; use optd_datafusion_repr::{ plan_nodes::{ColumnRefExpr, ConstantExpr, InListExpr, OptRelNode, OptRelNodeTyp}, @@ -12,7 +12,7 @@ use optd_datafusion_repr::{ impl< M: MostCommonValues + Serialize + DeserializeOwned, D: Distribution + Serialize + DeserializeOwned, - > OptCostModel + > AdvStats { /// Only support colA in (val1, val2, val3) where colA is a column ref and /// val1, val2, val3 are constants. @@ -76,7 +76,7 @@ impl< mod tests { use optd_core::rel_node::Value; - use crate::adv_cost::tests::{ + use crate::adv_stats::tests::{ create_one_column_cost_model, in_list, TestDistribution, TestMostCommonValues, TestPerColumnStats, TABLE1_NAME, }; diff --git a/optd-datafusion-repr-adv-cost/src/adv_cost/filter/like.rs b/optd-datafusion-repr-adv-cost/src/adv_stats/filter/like.rs similarity index 98% rename from optd-datafusion-repr-adv-cost/src/adv_cost/filter/like.rs rename to optd-datafusion-repr-adv-cost/src/adv_stats/filter/like.rs index 475ccef7..b8556b99 100644 --- a/optd-datafusion-repr-adv-cost/src/adv_cost/filter/like.rs +++ b/optd-datafusion-repr-adv-cost/src/adv_stats/filter/like.rs @@ -1,9 +1,9 @@ use datafusion::arrow::{array::StringArray, compute::like}; use serde::{de::DeserializeOwned, Serialize}; -use crate::adv_cost::{ +use crate::adv_stats::{ stats::{ColumnCombValue, Distribution, MostCommonValues}, - OptCostModel, UNIMPLEMENTED_SEL, + AdvStats, UNIMPLEMENTED_SEL, }; use optd_datafusion_repr::{ plan_nodes::{ColumnRefExpr, ConstantExpr, LikeExpr, OptRelNode, OptRelNodeTyp}, @@ -21,7 +21,7 @@ const FIXED_CHAR_SEL_FACTOR: f64 = 0.2; impl< M: MostCommonValues + Serialize + DeserializeOwned, D: Distribution + Serialize + DeserializeOwned, - > OptCostModel + > AdvStats { /// Compute the selectivity of a (NOT) LIKE expression. /// @@ -113,7 +113,7 @@ impl< mod tests { use optd_core::rel_node::Value; - use crate::adv_cost::{ + use crate::adv_stats::{ filter::like::{FIXED_CHAR_SEL_FACTOR, FULL_WILDCARD_SEL_FACTOR}, tests::{ create_one_column_cost_model, like, TestDistribution, TestMostCommonValues, diff --git a/optd-datafusion-repr-adv-cost/src/adv_cost/join.rs b/optd-datafusion-repr-adv-cost/src/adv_stats/join.rs similarity index 97% rename from optd-datafusion-repr-adv-cost/src/adv_cost/join.rs rename to optd-datafusion-repr-adv-cost/src/adv_stats/join.rs index e1f5d105..9e56f0b6 100644 --- a/optd-datafusion-repr-adv-cost/src/adv_cost/join.rs +++ b/optd-datafusion-repr-adv-cost/src/adv_stats/join.rs @@ -1,13 +1,10 @@ use std::collections::HashSet; use itertools::Itertools; -use optd_core::{ - cascades::{CascadesOptimizer, RelNodeContext}, - cost::Cost, -}; +use optd_core::cascades::{CascadesOptimizer, RelNodeContext}; use serde::{de::DeserializeOwned, Serialize}; -use crate::adv_cost::{ +use crate::adv_stats::{ stats::{Distribution, MostCommonValues}, DEFAULT_NUM_DISTINCT, }; @@ -25,23 +22,21 @@ use optd_datafusion_repr::{ }, }; -use super::{OptCostModel, DEFAULT_UNK_SEL}; +use super::{AdvStats, DEFAULT_UNK_SEL}; impl< M: MostCommonValues + Serialize + DeserializeOwned, D: Distribution + Serialize + DeserializeOwned, - > OptCostModel + > AdvStats { - pub(super) fn get_nlj_cost( + pub(crate) fn get_nlj_row_cnt( &self, join_typ: JoinType, - children: &[Cost], + left_row_cnt: f64, + right_row_cnt: f64, context: Option, optimizer: Option<&CascadesOptimizer>, - ) -> Cost { - let (row_cnt_1, _, _) = Self::cost_tuple(&children[0]); - let (row_cnt_2, _, _) = Self::cost_tuple(&children[1]); - let (_, compute_cost, _) = Self::cost_tuple(&children[2]); + ) -> f64 { let selectivity = if let (Some(context), Some(optimizer)) = (context, optimizer) { let schema = optimizer.get_property_by_group::(context.group_id, 0); @@ -59,28 +54,23 @@ impl< &schema, column_refs, input_correlation, - row_cnt_1, - row_cnt_2, + left_row_cnt, + right_row_cnt, ) } else { DEFAULT_UNK_SEL }; - Self::cost( - (row_cnt_1 * row_cnt_2 * selectivity).max(1.0), - row_cnt_1 * row_cnt_2 * compute_cost + row_cnt_1, - 0.0, - ) + (left_row_cnt * right_row_cnt * selectivity).max(1.0) } - pub(super) fn get_hash_join_cost( + pub(crate) fn get_hash_join_row_cnt( &self, join_typ: JoinType, - children: &[Cost], + left_row_cnt: f64, + right_row_cnt: f64, context: Option, optimizer: Option<&CascadesOptimizer>, - ) -> Cost { - let (row_cnt_1, _, _) = Self::cost_tuple(&children[0]); - let (row_cnt_2, _, _) = Self::cost_tuple(&children[1]); + ) -> f64 { let selectivity = if let (Some(context), Some(optimizer)) = (context, optimizer) { let schema = optimizer.get_property_by_group::(context.group_id, 0); @@ -111,18 +101,14 @@ impl< &schema, column_refs, input_correlation, - row_cnt_1, - row_cnt_2, + left_row_cnt, + right_row_cnt, left_col_cnt, ) } else { DEFAULT_UNK_SEL }; - Self::cost( - (row_cnt_1 * row_cnt_2 * selectivity).max(1.0), - row_cnt_1 * 2.0 + row_cnt_2, - 0.0, - ) + (left_row_cnt * right_row_cnt * selectivity).max(1.0) } fn get_input_correlation( @@ -548,7 +534,7 @@ mod tests { use optd_core::rel_node::Value; - use crate::adv_cost::{tests::*, DEFAULT_EQ_SEL}; + use crate::adv_stats::{tests::*, DEFAULT_EQ_SEL}; use optd_datafusion_repr::{ plan_nodes::{BinOpType, JoinType, LogOpType, OptRelNodeRef}, properties::{ diff --git a/optd-datafusion-repr-adv-cost/src/adv_cost/limit.rs b/optd-datafusion-repr-adv-cost/src/adv_stats/limit.rs similarity index 62% rename from optd-datafusion-repr-adv-cost/src/adv_cost/limit.rs rename to optd-datafusion-repr-adv-cost/src/adv_stats/limit.rs index 09b6c4b0..cd1d9fd8 100644 --- a/optd-datafusion-repr-adv-cost/src/adv_cost/limit.rs +++ b/optd-datafusion-repr-adv-cost/src/adv_stats/limit.rs @@ -1,26 +1,23 @@ -use optd_core::{ - cascades::{CascadesOptimizer, RelNodeContext}, - cost::Cost, -}; +use optd_core::cascades::{CascadesOptimizer, RelNodeContext}; use serde::{de::DeserializeOwned, Serialize}; -use crate::adv_cost::stats::{Distribution, MostCommonValues}; +use crate::adv_stats::stats::{Distribution, MostCommonValues}; use optd_datafusion_repr::plan_nodes::{ConstantExpr, ConstantType, OptRelNode, OptRelNodeTyp}; -use super::{OptCostModel, DEFAULT_UNK_SEL}; +use super::{AdvStats, DEFAULT_UNK_SEL}; impl< M: MostCommonValues + Serialize + DeserializeOwned, D: Distribution + Serialize + DeserializeOwned, - > OptCostModel + > AdvStats { - pub(super) fn get_limit_cost( - children: &[Cost], + pub(crate) fn get_limit_row_cnt( + &self, + child_row_cnt: f64, context: Option, optimizer: Option<&CascadesOptimizer>, - ) -> Cost { - let (row_cnt, compute_cost, _) = Self::cost_tuple(&children[0]); - let row_cnt = if let (Some(context), Some(optimizer)) = (context, optimizer) { + ) -> f64 { + if let (Some(context), Some(optimizer)) = (context, optimizer) { let fetch_expr = optimizer .get_predicate_binding(context.children_group_ids[2]) .expect("no expression found?"); @@ -37,13 +34,12 @@ impl< .as_u64(); // u64::MAX represents None if fetch == u64::MAX { - row_cnt + child_row_cnt } else { - row_cnt.min(fetch as f64) + child_row_cnt.min(fetch as f64) } } else { - (row_cnt * DEFAULT_UNK_SEL).max(1.0) - }; - Self::cost(row_cnt, compute_cost, 0.0) + (child_row_cnt * DEFAULT_UNK_SEL).max(1.0) + } } } diff --git a/optd-datafusion-repr-adv-cost/src/adv_cost/stats.rs b/optd-datafusion-repr-adv-cost/src/adv_stats/stats.rs similarity index 100% rename from optd-datafusion-repr-adv-cost/src/adv_cost/stats.rs rename to optd-datafusion-repr-adv-cost/src/adv_stats/stats.rs diff --git a/optd-datafusion-repr-adv-cost/src/bin/test_optimize_adv.rs b/optd-datafusion-repr-adv-cost/src/bin/test_optimize_adv.rs deleted file mode 100644 index 91c5a358..00000000 --- a/optd-datafusion-repr-adv-cost/src/bin/test_optimize_adv.rs +++ /dev/null @@ -1,109 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use optd_core::{ - cascades::CascadesOptimizer, - heuristics::HeuristicsOptimizer, - optimizer::Optimizer, - rel_node::Value, - rules::{Rule, RuleWrapper}, -}; -use optd_datafusion_repr::{ - plan_nodes::{ - BinOpExpr, BinOpType, ColumnRefExpr, ConstantExpr, JoinType, LogicalFilter, LogicalJoin, - LogicalScan, OptRelNode, OptRelNodeTyp, PlanNode, - }, - rules::{HashJoinRule, JoinAssocRule, JoinCommuteRule, PhysicalConversionRule}, -}; - -use optd_datafusion_repr_adv_cost::adv_cost::{stats::DataFusionPerTableStats, OptCostModel}; -use tracing::Level; - -pub fn main() { - tracing_subscriber::fmt() - .with_max_level(Level::DEBUG) - .with_ansi(false) - .with_target(false) - .init(); - - let rules: Vec>>> = vec![ - Arc::new(JoinCommuteRule::new()), - Arc::new(JoinAssocRule::new()), - Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Scan)), - Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Join( - JoinType::Inner, - ))), - Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Filter)), - Arc::new(HashJoinRule::new()), - ]; - let mut rule_wrappers = Vec::new(); - for rule in rules { - rule_wrappers.push(RuleWrapper::new_cascades(rule)); - } - - let mut optimizer = CascadesOptimizer::new( - rule_wrappers, - Box::new(OptCostModel::new( - [("t1", 1000), ("t2", 100), ("t3", 10000)] - .into_iter() - .map(|(x, y)| { - ( - x.to_string(), - DataFusionPerTableStats::new(y, HashMap::new()), - ) - }) - .collect(), - )), - vec![], - ); - - // The plan: (filter (scan t1) #1=2) join (scan t2) join (scan t3) - let scan1 = LogicalScan::new("t1".into()); - let filter_cond = BinOpExpr::new( - ColumnRefExpr::new(1).0, - ConstantExpr::new(Value::Int64(2)).0, - BinOpType::Eq, - ); - let filter1 = LogicalFilter::new(scan1.0, filter_cond.0); - let scan2 = LogicalScan::new("t2".into()); - let join_cond = ConstantExpr::new(Value::Bool(true)); - let scan3 = LogicalScan::new("t3".into()); - let join_filter = LogicalJoin::new(filter1.0, scan2.0, join_cond.clone().0, JoinType::Inner); - let fnal = LogicalJoin::new(scan3.0, join_filter.0, join_cond.0, JoinType::Inner); - let node = optimizer.optimize(fnal.0.clone().into_rel_node()); - optimizer.dump(None); - let node: Arc> = node.unwrap(); - println!( - "cost={}", - optimizer - .cost() - .explain(&optimizer.cost().compute_plan_node_cost(&node)) - ); - println!( - "{}", - PlanNode::from_rel_node(node) - .unwrap() - .explain_to_string(None) - ); - - let mut optimizer = HeuristicsOptimizer::new_with_rules( - vec![ - Arc::new(JoinCommuteRule::new()), - Arc::new(JoinAssocRule::new()), - Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Scan)), - Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Join( - JoinType::Inner, - ))), - Arc::new(PhysicalConversionRule::new(OptRelNodeTyp::Filter)), - Arc::new(HashJoinRule::new()), - ], - optd_core::heuristics::ApplyOrder::BottomUp, - Arc::new([]), - ); - let node = optimizer.optimize(fnal.0.into_rel_node()).unwrap(); - println!( - "{}", - PlanNode::from_rel_node(node) - .unwrap() - .explain_to_string(None) - ); -} diff --git a/optd-datafusion-repr-adv-cost/src/lib.rs b/optd-datafusion-repr-adv-cost/src/lib.rs index e6ddebf0..dcb31318 100644 --- a/optd-datafusion-repr-adv-cost/src/lib.rs +++ b/optd-datafusion-repr-adv-cost/src/lib.rs @@ -1,15 +1,144 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; -use adaptive_cost::{AdaptiveCostModel, RuntimeAdaptionStorage, DEFAULT_DECAY}; -use adv_cost::stats::DataFusionBaseTableStats; +use adv_stats::{ + stats::{DataFusionBaseTableStats, DataFusionDistribution, DataFusionMostCommonValues}, + AdvStats, +}; -use optd_datafusion_repr::{properties::schema::Catalog, DatafusionOptimizer}; +use optd_datafusion_repr::{ + cost::{adaptive_cost::RuntimeAdaptionStorageInner, OptCostModel, RuntimeAdaptionStorage}, + plan_nodes::OptRelNodeTyp, + properties::schema::Catalog, + DatafusionOptimizer, +}; -pub mod adaptive_cost; -pub mod adv_cost; +pub mod adv_stats; -pub trait WithRuntimeStatistics { - fn get_runtime_statistics(&self) -> RuntimeAdaptionStorage; +use std::collections::HashMap; + +use optd_core::{ + cascades::{CascadesOptimizer, RelNodeContext}, + cost::{Cost, CostModel, Statistics}, + rel_node::Value, +}; + +pub struct AdvancedCostModel { + base_model: OptCostModel, + stats: AdvStats, +} + +impl AdvancedCostModel { + pub fn new(stats: DataFusionBaseTableStats) -> Self { + let stats = AdvStats::new(stats); + let base_model = OptCostModel::new(HashMap::new()); + Self { base_model, stats } + } +} + +impl CostModel for AdvancedCostModel { + fn explain_cost(&self, cost: &Cost) -> String { + self.base_model.explain_cost(cost) + } + + fn explain_statistics(&self, cost: &Statistics) -> String { + self.base_model.explain_statistics(cost) + } + + fn accumulate(&self, total_cost: &mut Cost, cost: &Cost) { + self.base_model.accumulate(total_cost, cost) + } + + fn zero(&self) -> Cost { + self.base_model.zero() + } + + fn weighted_cost(&self, cost: &Cost) -> f64 { + self.base_model.weighted_cost(cost) + } + + fn compute_operation_cost( + &self, + node: &OptRelNodeTyp, + data: &Option, + children: &[Option<&Statistics>], + children_cost: &[Cost], + context: Option, + optimizer: Option<&CascadesOptimizer>, + ) -> Cost { + self.base_model.compute_operation_cost( + node, + data, + children, + children_cost, + context, + optimizer, + ) + } + + fn derive_statistics( + &self, + node: &OptRelNodeTyp, + data: &Option, + children: &[&Statistics], + context: Option, + optimizer: Option<&CascadesOptimizer>, + ) -> Statistics { + let row_cnts = children + .iter() + .map(|child| OptCostModel::row_cnt(child)) + .collect::>(); + match node { + OptRelNodeTyp::PhysicalScan => { + let table = data.as_ref().unwrap().as_str(); + let row_cnt = self + .stats + .per_table_stats_map + .get(table.as_ref()) + .map(|per_table_stats| per_table_stats.row_cnt) + .unwrap_or(1) as f64; + OptCostModel::stat(row_cnt) + } + OptRelNodeTyp::PhysicalLimit => { + let row_cnt = self + .stats + .get_limit_row_cnt(row_cnts[0], context, optimizer); + OptCostModel::stat(row_cnt) + } + OptRelNodeTyp::PhysicalFilter => { + let row_cnt = self + .stats + .get_filter_row_cnt(row_cnts[0], context, optimizer); + OptCostModel::stat(row_cnt) + } + OptRelNodeTyp::PhysicalNestedLoopJoin(join_typ) => { + let row_cnt = self.stats.get_nlj_row_cnt( + *join_typ, + row_cnts[0], + row_cnts[1], + context, + optimizer, + ); + OptCostModel::stat(row_cnt) + } + OptRelNodeTyp::PhysicalHashJoin(join_typ) => { + let row_cnt = self.stats.get_hash_join_row_cnt( + *join_typ, + row_cnts[0], + row_cnts[1], + context, + optimizer, + ); + OptCostModel::stat(row_cnt) + } + OptRelNodeTyp::PhysicalAgg => { + let row_cnt = self.stats.get_agg_row_cnt(context, optimizer, row_cnts[0]); + OptCostModel::stat(row_cnt) + } + _ => self + .base_model + .derive_statistics(node, data, children, context, optimizer), + } + } } pub fn new_physical_adv_cost( @@ -17,8 +146,10 @@ pub fn new_physical_adv_cost( stats: DataFusionBaseTableStats, enable_adaptive: bool, ) -> DatafusionOptimizer { - let cost_model = AdaptiveCostModel::new(DEFAULT_DECAY, stats); - let runtime_map = cost_model.get_runtime_map(); + let cost_model = AdvancedCostModel::new(stats); + // This cost model does not accept adaptive (runtime) statistics. + let runtime_map = + RuntimeAdaptionStorage::new(Mutex::new(RuntimeAdaptionStorageInner::default())); DatafusionOptimizer::new_physical_with_cost_model( catalog, enable_adaptive, diff --git a/optd-datafusion-repr/Cargo.toml b/optd-datafusion-repr/Cargo.toml index 95091b0d..db80f99a 100644 --- a/optd-datafusion-repr/Cargo.toml +++ b/optd-datafusion-repr/Cargo.toml @@ -18,3 +18,4 @@ datafusion-expr = "32.0.0" serde = { version = "1.0", features = ["derive"] } bincode = "1.3.3" union-find = { git = "https://github.com/Gun9niR/union-find-rs.git", rev = "794821514f7daefcbb8d5f38ef04e62fc18b5665" } +value-bag = { version = "1", features = ["owned"] } diff --git a/optd-datafusion-repr/src/bin/test_optimize.rs b/optd-datafusion-repr/src/bin/test_optimize.rs index e51caed2..583fda51 100644 --- a/optd-datafusion-repr/src/bin/test_optimize.rs +++ b/optd-datafusion-repr/src/bin/test_optimize.rs @@ -65,14 +65,8 @@ pub fn main() { let join_filter = LogicalJoin::new(filter1.0, scan2.0, join_cond.clone().0, JoinType::Inner); let fnal = LogicalJoin::new(scan3.0, join_filter.0, join_cond.0, JoinType::Inner); let node = optimizer.optimize(fnal.0.clone().into_rel_node()); - optimizer.dump(None); + optimizer.dump(); let node: Arc> = node.unwrap(); - println!( - "cost={}", - optimizer - .cost() - .explain(&optimizer.cost().compute_plan_node_cost(&node)) - ); println!( "{}", PlanNode::from_rel_node(node) diff --git a/optd-datafusion-repr/src/cost.rs b/optd-datafusion-repr/src/cost.rs index 91481808..9a0c077d 100644 --- a/optd-datafusion-repr/src/cost.rs +++ b/optd-datafusion-repr/src/cost.rs @@ -2,4 +2,4 @@ pub mod adaptive_cost; pub mod base_cost; pub use adaptive_cost::{AdaptiveCostModel, RuntimeAdaptionStorage}; -pub use base_cost::{OptCostModel, COMPUTE_COST, IO_COST, ROW_COUNT}; +pub use base_cost::{OptCostModel, COMPUTE_COST, IO_COST}; diff --git a/optd-datafusion-repr/src/cost/adaptive_cost.rs b/optd-datafusion-repr/src/cost/adaptive_cost.rs index 77c0cd4d..c0be863a 100644 --- a/optd-datafusion-repr/src/cost/adaptive_cost.rs +++ b/optd-datafusion-repr/src/cost/adaptive_cost.rs @@ -6,8 +6,8 @@ use std::{ use crate::{cost::OptCostModel, plan_nodes::OptRelNodeTyp}; use optd_core::{ cascades::{CascadesOptimizer, GroupId, RelNodeContext}, - cost::{Cost, CostModel}, - rel_node::{RelNode, Value}, + cost::{Cost, CostModel, Statistics}, + rel_node::Value, }; use super::base_cost::DEFAULT_TABLE_ROW_CNT; @@ -26,9 +26,27 @@ pub struct AdaptiveCostModel { decay: usize, } +impl AdaptiveCostModel { + fn get_row_cnt(&self, _: &Option, context: &Option) -> f64 { + let guard = self.runtime_row_cnt.lock().unwrap(); + if let Some((runtime_row_cnt, iter)) = + guard.history.get(&context.as_ref().unwrap().group_id) + { + if *iter + self.decay >= guard.iter_cnt { + return (*runtime_row_cnt).max(1) as f64; + } + } + DEFAULT_TABLE_ROW_CNT as f64 + } +} + impl CostModel for AdaptiveCostModel { - fn explain(&self, cost: &Cost) -> String { - self.base_model.explain(cost) + fn explain_cost(&self, cost: &Cost) -> String { + self.base_model.explain_cost(cost) + } + + fn explain_statistics(&self, cost: &Statistics) -> String { + self.base_model.explain_statistics(cost) } fn accumulate(&self, total_cost: &mut Cost, cost: &Cost) { @@ -39,46 +57,47 @@ impl CostModel for AdaptiveCostModel { self.base_model.zero() } - fn compute_cost( + fn weighted_cost(&self, cost: &Cost) -> f64 { + self.base_model.weighted_cost(cost) + } + + fn compute_operation_cost( &self, node: &OptRelNodeTyp, data: &Option, - children: &[Cost], + children: &[Option<&Statistics>], + children_cost: &[Cost], context: Option, - _optimizer: Option<&CascadesOptimizer>, + optimizer: Option<&CascadesOptimizer>, ) -> Cost { if let OptRelNodeTyp::PhysicalScan = node { - let guard = self.runtime_row_cnt.lock().unwrap(); - if let Some((runtime_row_cnt, iter)) = guard.history.get(&context.unwrap().group_id) { - if *iter + self.decay >= guard.iter_cnt { - let runtime_row_cnt = (*runtime_row_cnt).max(1) as f64; - return OptCostModel::cost(runtime_row_cnt, 0.0, runtime_row_cnt); - } else { - return OptCostModel::cost(DEFAULT_TABLE_ROW_CNT as f64, 0.0, 1.0); - } - } else { - return OptCostModel::cost(DEFAULT_TABLE_ROW_CNT as f64, 0.0, 1.0); - } - } - let (mut row_cnt, compute_cost, io_cost) = OptCostModel::cost_tuple( - &self - .base_model - .compute_cost(node, data, children, None, None), - ); - if let Some(context) = context { - let guard = self.runtime_row_cnt.lock().unwrap(); - if let Some((runtime_row_cnt, iter)) = guard.history.get(&context.group_id) { - if *iter + self.decay >= guard.iter_cnt { - let runtime_row_cnt = (*runtime_row_cnt).max(1) as f64; - row_cnt = runtime_row_cnt; - } - } + let row_cnt = self.get_row_cnt(data, &context); + return OptCostModel::cost(0.0, row_cnt); } - OptCostModel::cost(row_cnt, compute_cost, io_cost) + self.base_model.compute_operation_cost( + node, + data, + children, + children_cost, + context, + optimizer, + ) } - fn compute_plan_node_cost(&self, node: &RelNode) -> Cost { - self.base_model.compute_plan_node_cost(node) + fn derive_statistics( + &self, + node: &OptRelNodeTyp, + data: &Option, + children: &[&Statistics], + context: Option, + optimizer: Option<&CascadesOptimizer>, + ) -> Statistics { + if let OptRelNodeTyp::PhysicalScan = node { + let row_cnt = self.get_row_cnt(data, &context); + return OptCostModel::stat(row_cnt); + } + self.base_model + .derive_statistics(node, data, children, context, optimizer) } } diff --git a/optd-datafusion-repr/src/cost/base_cost.rs b/optd-datafusion-repr/src/cost/base_cost.rs index 608f0508..52bcba52 100644 --- a/optd-datafusion-repr/src/cost/base_cost.rs +++ b/optd-datafusion-repr/src/cost/base_cost.rs @@ -4,40 +4,21 @@ use crate::plan_nodes::OptRelNodeTyp; use itertools::Itertools; use optd_core::{ cascades::{CascadesOptimizer, RelNodeContext}, - cost::{Cost, CostModel}, - rel_node::{RelNode, RelNodeTyp, Value}, + cost::{Cost, CostModel, Statistics}, + rel_node::Value, }; - -fn compute_plan_node_cost>( - model: &C, - node: &RelNode, - total_cost: &mut Cost, -) -> Cost { - let children = node - .children - .iter() - .map(|child| compute_plan_node_cost(model, child, total_cost)) - .collect_vec(); - let cost = model.compute_cost(&node.typ, &node.data, &children, None, None); - model.accumulate(total_cost, &cost); - cost -} +use value_bag::ValueBag; pub struct OptCostModel { table_stat: HashMap, } -pub const ROW_COUNT: usize = 1; -pub const COMPUTE_COST: usize = 2; -pub const IO_COST: usize = 3; +pub const COMPUTE_COST: usize = 0; +pub const IO_COST: usize = 1; pub(crate) const DEFAULT_TABLE_ROW_CNT: usize = 1000; impl OptCostModel { - pub fn row_cnt(Cost(cost): &Cost) -> f64 { - cost[ROW_COUNT] - } - pub fn compute_cost(Cost(cost): &Cost) -> f64 { cost[COMPUTE_COST] } @@ -46,151 +27,181 @@ impl OptCostModel { cost[IO_COST] } - pub fn cost_tuple(Cost(cost): &Cost) -> (f64, f64, f64) { - (cost[ROW_COUNT], cost[COMPUTE_COST], cost[IO_COST]) + pub fn row_cnt(Statistics(stat): &Statistics) -> f64 { + stat.by_ref().as_f64() } - pub fn weighted_cost(row_cnt: f64, compute_cost: f64, io_cost: f64) -> f64 { - let _ = row_cnt; - compute_cost + io_cost + pub fn cost(compute_cost: f64, io_cost: f64) -> Cost { + Cost(vec![compute_cost, io_cost]) } - pub fn cost(row_cnt: f64, compute_cost: f64, io_cost: f64) -> Cost { - Cost(vec![ - Self::weighted_cost(row_cnt, compute_cost, io_cost), - row_cnt, - compute_cost, - io_cost, - ]) + pub fn stat(row_cnt: f64) -> Statistics { + Statistics(ValueBag::from_f64(row_cnt).to_owned()) + } + + pub fn cost_tuple(Cost(cost): &Cost) -> (f64, f64) { + (cost[COMPUTE_COST], cost[IO_COST]) + } + + fn get_row_cnt(&self, data: &Option) -> f64 { + let table_name = data.as_ref().unwrap().as_str(); + self.table_stat + .get(table_name.as_ref()) + .copied() + .unwrap_or(DEFAULT_TABLE_ROW_CNT) as f64 } } impl CostModel for OptCostModel { - fn explain(&self, cost: &Cost) -> String { + fn explain_cost(&self, cost: &Cost) -> String { format!( - "weighted={},row_cnt={},compute={},io={}", - cost.0[0], - Self::row_cnt(cost), + "{{compute={},io={}}}", Self::compute_cost(cost), Self::io_cost(cost) ) } + fn explain_statistics(&self, stat: &Statistics) -> String { + format!("{{row_cnt={}}}", Self::row_cnt(stat)) + } + fn accumulate(&self, total_cost: &mut Cost, cost: &Cost) { - // do not accumulate row count total_cost.0[COMPUTE_COST] += Self::compute_cost(cost); total_cost.0[IO_COST] += Self::io_cost(cost); - total_cost.0[0] = Self::weighted_cost( - total_cost.0[ROW_COUNT], - total_cost.0[COMPUTE_COST], - total_cost.0[IO_COST], - ); } fn zero(&self) -> Cost { - Self::cost(0.0, 0.0, 0.0) + Cost(vec![0.0, 0.0]) } - fn compute_cost( + fn derive_statistics( &self, node: &OptRelNodeTyp, data: &Option, - children: &[Cost], + children: &[&Statistics], _context: Option, _optimizer: Option<&CascadesOptimizer>, - ) -> Cost { + ) -> Statistics { match node { OptRelNodeTyp::PhysicalScan => { - let table_name = data.as_ref().unwrap().as_str(); - let row_cnt = self - .table_stat - .get(table_name.as_ref()) - .copied() - .unwrap_or(DEFAULT_TABLE_ROW_CNT) as f64; - Self::cost(row_cnt, 0.0, row_cnt) + let row_cnt = self.get_row_cnt(data); + Self::stat(row_cnt) } OptRelNodeTyp::PhysicalLimit => { - let (row_cnt, compute_cost, _) = Self::cost_tuple(&children[0]); + let row_cnt = Self::row_cnt(children[0]); let selectivity = 0.001; - Self::cost((row_cnt * selectivity).max(1.0), compute_cost, 0.0) + Self::stat((row_cnt * selectivity).max(1.0)) } - OptRelNodeTyp::PhysicalEmptyRelation => Self::cost(0.5, 0.01, 0.0), + OptRelNodeTyp::PhysicalEmptyRelation => Self::stat(0.01), OptRelNodeTyp::PhysicalFilter => { - let (row_cnt, _, _) = Self::cost_tuple(&children[0]); - let (_, compute_cost, _) = Self::cost_tuple(&children[1]); + let row_cnt = Self::row_cnt(children[0]); let selectivity = 0.001; - Self::cost( - (row_cnt * selectivity).max(1.0), - row_cnt * compute_cost, - 0.0, - ) + Self::stat((row_cnt * selectivity).max(1.0)) } OptRelNodeTyp::PhysicalNestedLoopJoin(_) => { - let (row_cnt_1, _, _) = Self::cost_tuple(&children[0]); - let (row_cnt_2, _, _) = Self::cost_tuple(&children[1]); - let (_, compute_cost, _) = Self::cost_tuple(&children[2]); + let row_cnt_1 = Self::row_cnt(children[0]); + let row_cnt_2 = Self::row_cnt(children[1]); let selectivity = 0.01; - Self::cost( - (row_cnt_1 * row_cnt_2 * selectivity).max(1.0), - row_cnt_1 * row_cnt_2 * compute_cost + row_cnt_1, - 0.0, - ) + Self::stat((row_cnt_1 * row_cnt_2 * selectivity).max(1.0)) + } + OptRelNodeTyp::PhysicalHashJoin(_) => { + let row_cnt_1 = Self::row_cnt(children[0]); + let row_cnt_2 = Self::row_cnt(children[1]); + Self::stat(row_cnt_1.min(row_cnt_2).max(1.0)) + } + OptRelNodeTyp::PhysicalSort + | OptRelNodeTyp::PhysicalAgg + | OptRelNodeTyp::PhysicalProjection => { + let row_cnt = Self::row_cnt(children[0]); + Self::stat(row_cnt) + } + OptRelNodeTyp::List => Self::stat(1.0), + _ if node.is_expression() => Self::stat(1.0), + x => unimplemented!("cannot derive statistics for {}", x), + } + } + + fn compute_operation_cost( + &self, + node: &OptRelNodeTyp, + data: &Option, + children: &[Option<&Statistics>], + children_cost: &[Cost], + _context: Option, + _optimizer: Option<&CascadesOptimizer>, + ) -> Cost { + let row_cnts = children + .iter() + .map(|child| child.map(Self::row_cnt).unwrap_or(0 as f64)) + .collect_vec(); + match node { + OptRelNodeTyp::PhysicalScan => { + let row_cnt = self.get_row_cnt(data); + Self::cost(0.0, row_cnt) + } + OptRelNodeTyp::PhysicalLimit => { + let row_cnt = row_cnts[0]; + Self::cost(row_cnt, 0.0) + } + OptRelNodeTyp::PhysicalEmptyRelation => Self::cost(0.01, 0.0), + OptRelNodeTyp::PhysicalFilter => { + let row_cnt = row_cnts[0]; + let (compute_cost, _) = Self::cost_tuple(&children_cost[1]); + Self::cost(row_cnt * compute_cost, 0.0) + } + OptRelNodeTyp::PhysicalNestedLoopJoin(_) => { + let row_cnt_1 = row_cnts[0]; + let row_cnt_2 = row_cnts[1]; + let (compute_cost, _) = Self::cost_tuple(&children_cost[2]); + Self::cost(row_cnt_1 * row_cnt_2 * compute_cost + row_cnt_1, 0.0) } OptRelNodeTyp::PhysicalProjection => { - let (row_cnt, _, _) = Self::cost_tuple(&children[0]); - let (_, compute_cost, _) = Self::cost_tuple(&children[1]); - Self::cost(row_cnt, compute_cost * row_cnt, 0.0) + let row_cnt = row_cnts[0]; + let (compute_cost, _) = Self::cost_tuple(&children_cost[1]); + Self::cost(row_cnt * compute_cost, 0.0) } OptRelNodeTyp::PhysicalHashJoin(_) => { - let (row_cnt_1, _, _) = Self::cost_tuple(&children[0]); - let (row_cnt_2, _, _) = Self::cost_tuple(&children[1]); - Self::cost( - row_cnt_1.min(row_cnt_2).max(1.0), - row_cnt_1 * 2.0 + row_cnt_2, - 0.0, - ) + let row_cnt_1 = row_cnts[0]; + let row_cnt_2 = row_cnts[1]; + Self::cost(row_cnt_1 * 2.0 + row_cnt_2, 0.0) } - OptRelNodeTyp::PhysicalSort => { - let (row_cnt, _, _) = Self::cost_tuple(&children[0]); - Self::cost(row_cnt, row_cnt * row_cnt.ln_1p().max(1.0), 0.0) + let row_cnt = row_cnts[0]; + Self::cost(row_cnt * row_cnt.ln_1p().max(1.0), 0.0) } OptRelNodeTyp::PhysicalAgg => { - let (row_cnt, _, _) = Self::cost_tuple(&children[0]); - let (_, compute_cost_1, _) = Self::cost_tuple(&children[1]); - let (_, compute_cost_2, _) = Self::cost_tuple(&children[2]); - Self::cost(row_cnt, row_cnt * (compute_cost_1 + compute_cost_2), 0.0) + let row_cnt = row_cnts[0]; + let (compute_cost_1, _) = Self::cost_tuple(&children_cost[1]); + let (compute_cost_2, _) = Self::cost_tuple(&children_cost[2]); + Self::cost(row_cnt * (compute_cost_1 + compute_cost_2), 0.0) } + // List and expressions are computed in the same way -- but list has much fewer cost OptRelNodeTyp::List => { - let compute_cost = children + let compute_cost = children_cost .iter() .map(|child| { - let (_, compute_cost, _) = Self::cost_tuple(child); + let (compute_cost, _) = Self::cost_tuple(child); compute_cost }) .sum::(); - Self::cost(1.0, compute_cost + 0.01, 0.0) + Self::cost(compute_cost + 0.01, 0.0) } - OptRelNodeTyp::ColumnRef => Self::cost(1.0, 0.01, 0.0), _ if node.is_expression() => { - let compute_cost = children + let compute_cost = children_cost .iter() .map(|child| { - let (_, compute_cost, _) = Self::cost_tuple(child); + let (compute_cost, _) = Self::cost_tuple(child); compute_cost }) .sum::(); - Self::cost(1.0, compute_cost + 1.0, 0.0) + Self::cost(compute_cost + 1.0, 0.0) } x => unimplemented!("cannot compute cost for {}", x), } } - fn compute_plan_node_cost(&self, node: &RelNode) -> Cost { - let mut cost = self.zero(); - let top = compute_plan_node_cost(self, node, &mut cost); - cost.0[ROW_COUNT] = top.0[ROW_COUNT]; - cost + fn weighted_cost(&self, cost: &Cost) -> f64 { + Self::compute_cost(cost) + Self::io_cost(cost) } } diff --git a/optd-datafusion-repr/src/explain.rs b/optd-datafusion-repr/src/explain.rs index daaaeded..9b2805ce 100644 --- a/optd-datafusion-repr/src/explain.rs +++ b/optd-datafusion-repr/src/explain.rs @@ -1,8 +1,6 @@ use optd_core::rel_node::RelNodeMeta; use pretty_xmlish::Pretty; -use crate::cost::{COMPUTE_COST, IO_COST, ROW_COUNT}; - pub trait Insertable<'a> { fn with_meta(self, meta: &RelNodeMeta) -> Self; } @@ -10,16 +8,8 @@ pub trait Insertable<'a> { impl<'a> Insertable<'a> for Vec<(&'a str, Pretty<'a>)> { // FIXME: this assumes we are using OptCostModel fn with_meta(mut self, meta: &RelNodeMeta) -> Self { - self.push(( - "cost", - Pretty::display(&format!( - "weighted={:.2},row_cnt={:.2},compute={:.2},io={:.2}", - meta.cost.0[0], - meta.cost.0[ROW_COUNT], - meta.cost.0[COMPUTE_COST], - meta.cost.0[IO_COST], - )), - )); + self.push(("cost", Pretty::display(&meta.cost_display))); + self.push(("stat", Pretty::display(&meta.stat_display))); self } } diff --git a/optd-datafusion-repr/src/lib.rs b/optd-datafusion-repr/src/lib.rs index e7634ac1..0df8efe7 100644 --- a/optd-datafusion-repr/src/lib.rs +++ b/optd-datafusion-repr/src/lib.rs @@ -260,8 +260,4 @@ impl DatafusionOptimizer { Ok((group_id, optimized_rel, meta.unwrap())) } - - pub fn dump(&self, group_id: Option) { - self.cascades_optimizer.dump(group_id) - } } diff --git a/optd-datafusion-repr/src/testing/dummy_cost.rs b/optd-datafusion-repr/src/testing/dummy_cost.rs index ea00fcb1..1c933e75 100644 --- a/optd-datafusion-repr/src/testing/dummy_cost.rs +++ b/optd-datafusion-repr/src/testing/dummy_cost.rs @@ -1,35 +1,56 @@ use crate::plan_nodes::OptRelNodeTyp; use optd_core::{ cascades::{CascadesOptimizer, RelNodeContext}, - cost::{Cost, CostModel}, - rel_node::{RelNode, Value}, + cost::{Cost, CostModel, Statistics}, + rel_node::Value, }; +use value_bag::ValueBag; /// Dummy cost model that returns a 0 cost in all cases. /// Intended for testing with the cascades optimizer. pub struct DummyCostModel; impl CostModel for DummyCostModel { - fn compute_cost( + /// Compute the cost of a single operation + fn compute_operation_cost( &self, - _node: &OptRelNodeTyp, - _data: &Option, - _children: &[Cost], - _context: Option, - _optimizer: Option<&CascadesOptimizer>, + _: &OptRelNodeTyp, + _: &Option, + _: &[Option<&Statistics>], + _: &[Cost], + _: Option, + _: Option<&CascadesOptimizer>, ) -> Cost { - Cost(vec![0.0]) + Cost(vec![1.0]) } - fn compute_plan_node_cost(&self, _node: &RelNode) -> Cost { - Cost(vec![0.0]) + /// Derive the statistics of a single operation + fn derive_statistics( + &self, + _: &OptRelNodeTyp, + _: &Option, + _: &[&Statistics], + _: Option, + _: Option<&CascadesOptimizer>, + ) -> Statistics { + Statistics(ValueBag::empty().to_owned()) + } + + fn explain_cost(&self, _: &Cost) -> String { + "dummy_cost".to_string() + } + + fn explain_statistics(&self, _: &Statistics) -> String { + "dummy_statistics".to_string() } - fn explain(&self, _node: &Cost) -> String { - "Dummy cost".to_string() + fn weighted_cost(&self, cost: &Cost) -> f64 { + cost.0[0] } - fn accumulate(&self, _total_cost: &mut Cost, _cost: &Cost) {} + fn accumulate(&self, total_cost: &mut Cost, cost: &Cost) { + total_cost.0[0] += cost.0[0]; + } fn zero(&self) -> Cost { Cost(vec![0.0]) diff --git a/optd-perfbench/src/datafusion_dbms.rs b/optd-perfbench/src/datafusion_dbms.rs index 69059ed6..ed075edb 100644 --- a/optd-perfbench/src/datafusion_dbms.rs +++ b/optd-perfbench/src/datafusion_dbms.rs @@ -28,7 +28,7 @@ use lazy_static::lazy_static; use optd_datafusion_bridge::{DatafusionCatalog, OptdQueryPlanner}; use optd_datafusion_repr::DatafusionOptimizer; use optd_datafusion_repr_adv_cost::{ - adv_cost::stats::{DataFusionBaseTableStats, DataFusionPerTableStats}, + adv_stats::stats::{DataFusionBaseTableStats, DataFusionPerTableStats}, new_physical_adv_cost, }; use parquet::arrow::arrow_reader::{ diff --git a/optd-sqlplannertest/Cargo.toml b/optd-sqlplannertest/Cargo.toml index f781aee7..f4626021 100644 --- a/optd-sqlplannertest/Cargo.toml +++ b/optd-sqlplannertest/Cargo.toml @@ -11,6 +11,7 @@ anyhow = { version = "1", features = ["backtrace"] } sqlplannertest = { git = "https://github.com/risinglightdb/sqlplannertest-rs", branch = "main" } async-trait = "0.1" datafusion-optd-cli = { path = "../datafusion-optd-cli" } +optd-datafusion-repr-adv-cost = { path = "../optd-datafusion-repr-adv-cost" } datafusion = { version = "32.0.0", features = [ "avro", "crypto_expressions", diff --git a/optd-sqlplannertest/src/bin/planner_test_apply.rs b/optd-sqlplannertest/src/bin/planner_test_apply.rs index be36613a..829186ca 100644 --- a/optd-sqlplannertest/src/bin/planner_test_apply.rs +++ b/optd-sqlplannertest/src/bin/planner_test_apply.rs @@ -9,6 +9,8 @@ use clap::Parser; struct Cli { /// Optional list of directories to apply the test; if empty, apply all tests directories: Vec, + #[clap(long)] + enable_advanced_cost_model: bool, } #[tokio::main] @@ -17,11 +19,18 @@ async fn main() -> Result<()> { let cli = Cli::parse(); + let enable_advanced_cost_model = cli.enable_advanced_cost_model; if cli.directories.is_empty() { println!("Running all tests"); sqlplannertest::planner_test_apply( Path::new(env!("CARGO_MANIFEST_DIR")).join("tests"), - || async { optd_sqlplannertest::DatafusionDBMS::new().await }, + move || async move { + if enable_advanced_cost_model { + optd_sqlplannertest::DatafusionDBMS::new_advanced_cost().await + } else { + optd_sqlplannertest::DatafusionDBMS::new().await + } + }, ) .await?; } else { @@ -31,7 +40,13 @@ async fn main() -> Result<()> { Path::new(env!("CARGO_MANIFEST_DIR")) .join("tests") .join(directory), - || async { optd_sqlplannertest::DatafusionDBMS::new().await }, + move || async move { + if enable_advanced_cost_model { + optd_sqlplannertest::DatafusionDBMS::new_advanced_cost().await + } else { + optd_sqlplannertest::DatafusionDBMS::new().await + } + }, ) .await?; } diff --git a/optd-sqlplannertest/src/lib.rs b/optd-sqlplannertest/src/lib.rs index 2a3a7af5..0934caa9 100644 --- a/optd-sqlplannertest/src/lib.rs +++ b/optd-sqlplannertest/src/lib.rs @@ -11,6 +11,8 @@ use lazy_static::lazy_static; use mimalloc::MiMalloc; use optd_datafusion_bridge::{DatafusionCatalog, OptdQueryPlanner}; use optd_datafusion_repr::DatafusionOptimizer; +use optd_datafusion_repr_adv_cost::adv_stats::stats::DataFusionBaseTableStats; +use optd_datafusion_repr_adv_cost::new_physical_adv_cost; use regex::Regex; use std::collections::HashSet; use std::sync::Arc; @@ -32,9 +34,20 @@ pub struct DatafusionDBMS { impl DatafusionDBMS { pub async fn new() -> Result { - let (ctx, optd_optimizer) = DatafusionDBMS::new_session_ctx(false, None).await?; + let (ctx, optd_optimizer) = DatafusionDBMS::new_session_ctx(false, None, false).await?; let (use_df_logical_ctx, _) = - DatafusionDBMS::new_session_ctx(true, Some(ctx.state().catalog_list().clone())).await?; + Self::new_session_ctx(true, Some(ctx.state().catalog_list().clone()), false).await?; + Ok(Self { + ctx, + use_df_logical_ctx, + optd_optimizer: Some(optd_optimizer), + }) + } + + pub async fn new_advanced_cost() -> Result { + let (ctx, optd_optimizer) = DatafusionDBMS::new_session_ctx(false, None, true).await?; + let (use_df_logical_ctx, _) = + Self::new_session_ctx(true, Some(ctx.state().catalog_list().clone()), true).await?; Ok(Self { ctx, use_df_logical_ctx, @@ -46,6 +59,7 @@ impl DatafusionDBMS { async fn new_session_ctx( use_df_logical: bool, catalog: Option>, + with_advanced_cost: bool, ) -> Result<(SessionContext, Arc)> { let mut session_config = SessionConfig::from_env()?.with_information_schema(true); if !use_df_logical { @@ -66,10 +80,18 @@ impl DatafusionDBMS { } else { SessionState::new_with_config_rt(session_config.clone(), Arc::new(runtime_env)) }; - let optimizer: DatafusionOptimizer = DatafusionOptimizer::new_physical( - Arc::new(DatafusionCatalog::new(state.catalog_list())), - false, - ); + let optimizer = if with_advanced_cost { + new_physical_adv_cost( + Arc::new(DatafusionCatalog::new(state.catalog_list())), + DataFusionBaseTableStats::default(), + false, + ) + } else { + DatafusionOptimizer::new_physical( + Arc::new(DatafusionCatalog::new(state.catalog_list())), + false, + ) + }; if !use_df_logical { // clean up optimizer rules so that we can plug in our own optimizer state = state.with_optimizer_rules(vec![]); diff --git a/optd-sqlplannertest/tests/basic/verbose.planner.sql b/optd-sqlplannertest/tests/basic/verbose.planner.sql index 00b4169c..6303b843 100644 --- a/optd-sqlplannertest/tests/basic/verbose.planner.sql +++ b/optd-sqlplannertest/tests/basic/verbose.planner.sql @@ -17,7 +17,7 @@ PhysicalScan { table: t1 } select * from t1; /* -PhysicalScan { table: t1, cost: weighted=1.00,row_cnt=1000.00,compute=0.00,io=1.00 } +PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } */ -- Test verbose explain with aggregation @@ -28,7 +28,8 @@ PhysicalAgg ├── aggrs:Agg(Count) │ └── [ 1(u8) ] ├── groups: [] -├── cost: weighted=10071.06,row_cnt=1000.00,compute=10070.06,io=1.00 -└── PhysicalScan { table: t1, cost: weighted=1.00,row_cnt=1000.00,compute=0.00,io=1.00 } +├── cost: {compute=10070.059999999998,io=1000} +├── stat: {row_cnt=1000} +└── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } */ diff --git a/optd-sqlplannertest/tests/joins/join_enumerate.planner.sql b/optd-sqlplannertest/tests/joins/join_enumerate.planner.sql index 426dfa65..4b7c6fba 100644 --- a/optd-sqlplannertest/tests/joins/join_enumerate.planner.sql +++ b/optd-sqlplannertest/tests/joins/join_enumerate.planner.sql @@ -12,7 +12,19 @@ insert into t3 values (0, 300), (1, 301), (2, 302); 3 */ --- Test whether the optimizer enumerates all join orders. +-- Test whether the optimizer enumerates all 2-join orders. +select * from t2, t1 where t1v1 = t2v1; + +/* +(Join t1 t2) +(Join t2 t1) + +0 200 0 0 +1 201 1 1 +2 202 2 2 +*/ + +-- Test whether the optimizer enumerates all 3-join orders. select * from t2, t1, t3 where t1v1 = t2v1 and t1v2 = t3v2; /* @@ -30,7 +42,7 @@ select * from t2, t1, t3 where t1v1 = t2v1 and t1v2 = t3v2; 2 202 2 2 2 302 */ --- Test whether the optimizer enumerates all join orders. +-- Test whether the optimizer enumerates all 3-join orders. select * from t1, t2, t3 where t1v1 = t2v1 and t1v2 = t3v2; /* diff --git a/optd-sqlplannertest/tests/joins/join_enumerate.yml b/optd-sqlplannertest/tests/joins/join_enumerate.yml index 6040d312..9417777c 100644 --- a/optd-sqlplannertest/tests/joins/join_enumerate.yml +++ b/optd-sqlplannertest/tests/joins/join_enumerate.yml @@ -7,15 +7,21 @@ insert into t3 values (0, 300), (1, 301), (2, 302); tasks: - execute +- sql: | + select * from t2, t1 where t1v1 = t2v1; + desc: Test whether the optimizer enumerates all 2-join orders. + tasks: + - explain:logical_join_orders + - execute - sql: | select * from t2, t1, t3 where t1v1 = t2v1 and t1v2 = t3v2; - desc: Test whether the optimizer enumerates all join orders. + desc: Test whether the optimizer enumerates all 3-join orders. tasks: - explain:logical_join_orders - execute - sql: | select * from t1, t2, t3 where t1v1 = t2v1 and t1v2 = t3v2; - desc: Test whether the optimizer enumerates all join orders. + desc: Test whether the optimizer enumerates all 3-join orders. tasks: - explain:logical_join_orders - execute diff --git a/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql b/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql index ce52ee3e..dceb756d 100644 --- a/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql +++ b/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql @@ -53,21 +53,25 @@ LogicalProjection { exprs: [ #0, #1 ] } ├── LogicalAgg { exprs: [], groups: [ #0 ] } │ └── LogicalScan { table: t1 } └── LogicalScan { table: t2 } -PhysicalProjection { exprs: [ #2, #3 ] } -└── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } +PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=9147.220000000001,io=3000}, stat: {row_cnt=1} } +└── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=9139.2,io=3000}, stat: {row_cnt=1} } ├── PhysicalAgg │ ├── aggrs:Agg(Sum) │ │ └── [ Cast { cast_to: Int64, expr: #2 } ] │ ├── groups: [ #1 ] - │ └── PhysicalProjection { exprs: [ #2, #0, #1 ] } - │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } + │ ├── cost: {compute=8133.180000000001,io=2000} + │ ├── stat: {row_cnt=1} + │ └── PhysicalProjection { exprs: [ #2, #0, #1 ], cost: {compute=8045.06,io=2000}, stat: {row_cnt=1} } + │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=8033.04,io=2000}, stat: {row_cnt=1} } │ ├── PhysicalFilter │ │ ├── cond:Gt │ │ │ ├── #0 │ │ │ └── 100(i64) - │ │ └── PhysicalScan { table: t2 } - │ └── PhysicalAgg { aggrs: [], groups: [ #0 ] } - │ └── PhysicalScan { table: t1 } - └── PhysicalScan { table: t1 } + │ │ ├── cost: {compute=5005,io=1000} + │ │ ├── stat: {row_cnt=1} + │ │ └── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=2022.0199999999995,io=1000}, stat: {row_cnt=1000} } + │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } + └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} } */ diff --git a/optd-sqlplannertest/tests/subqueries/subquery_unnesting.yml b/optd-sqlplannertest/tests/subqueries/subquery_unnesting.yml index c41a28b0..8d5d574a 100644 --- a/optd-sqlplannertest/tests/subqueries/subquery_unnesting.yml +++ b/optd-sqlplannertest/tests/subqueries/subquery_unnesting.yml @@ -13,4 +13,4 @@ select * from t1 where (select sum(t2v3) from t2 where t2v1 = t1v1) > 100; desc: Test whether the optimizer can unnest correlated subqueries. tasks: - - explain:logical_optd,optimized_logical_optd,physical_optd + - explain[verbose]:logical_optd,optimized_logical_optd,physical_optd diff --git a/optd-sqlplannertest/tests/tpch/tpch-06-10.planner.sql b/optd-sqlplannertest/tests/tpch/tpch-06-10.planner.sql index 6c4bcf59..adc524ed 100644 --- a/optd-sqlplannertest/tests/tpch/tpch-06-10.planner.sql +++ b/optd-sqlplannertest/tests/tpch/tpch-06-10.planner.sql @@ -460,23 +460,22 @@ PhysicalSort │ └── #54 └── PhysicalHashJoin { join_type: Inner, left_keys: [ #51 ], right_keys: [ #0 ] } ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #12 ], right_keys: [ #0 ] } - │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #44 ], right_keys: [ #0 ] } - │ │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #33 ], right_keys: [ #0 ] } - │ │ │ ├── PhysicalProjection { exprs: [ #25, #26, #27, #28, #29, #30, #31, #32, #33, #34, #35, #36, #37, #38, #39, #40, #9, #10, #11, #12, #13, #14, #15, #16, #17, #18, #19, #20, #21, #22, #23, #24, #0, #1, #2, #3, #4, #5, #6, #7, #8 ] } - │ │ │ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #10, #11 ], right_keys: [ #0, #9 ] } - │ │ │ │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } - │ │ │ │ │ ├── PhysicalFilter { cond: Between { expr: #4, lower: Cast { cast_to: Date32, expr: "1995-01-01" }, upper: Cast { cast_to: Date32, expr: "1996-12-31" } } } - │ │ │ │ │ │ └── PhysicalScan { table: orders } - │ │ │ │ │ └── PhysicalScan { table: lineitem } - │ │ │ │ └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ │ ├── PhysicalFilter - │ │ │ │ │ ├── cond:Eq - │ │ │ │ │ │ ├── #4 - │ │ │ │ │ │ └── "ECONOMY ANODIZED STEEL" - │ │ │ │ │ └── PhysicalScan { table: part } - │ │ │ │ └── PhysicalScan { table: supplier } - │ │ │ └── PhysicalScan { table: customer } - │ │ └── PhysicalScan { table: nation } + │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #16 ], right_keys: [ #0 ] } + │ │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #0, #9 ], right_keys: [ #1, #2 ] } + │ │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } + │ │ │ │ ├── PhysicalFilter + │ │ │ │ │ ├── cond:Eq + │ │ │ │ │ │ ├── #4 + │ │ │ │ │ │ └── "ECONOMY ANODIZED STEEL" + │ │ │ │ │ └── PhysicalScan { table: part } + │ │ │ │ └── PhysicalScan { table: supplier } + │ │ │ └── PhysicalScan { table: lineitem } + │ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #12 ], right_keys: [ #0 ] } + │ │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #1 ], right_keys: [ #0 ] } + │ │ │ ├── PhysicalFilter { cond: Between { expr: #4, lower: Cast { cast_to: Date32, expr: "1995-01-01" }, upper: Cast { cast_to: Date32, expr: "1996-12-31" } } } + │ │ │ │ └── PhysicalScan { table: orders } + │ │ │ └── PhysicalScan { table: customer } + │ │ └── PhysicalScan { table: nation } │ └── PhysicalScan { table: nation } └── PhysicalFilter ├── cond:Eq diff --git a/optd-sqlplannertest/tests/tpch/tpch-11-15.planner.sql b/optd-sqlplannertest/tests/tpch/tpch-11-15.planner.sql index 81157d57..06345ae1 100644 --- a/optd-sqlplannertest/tests/tpch/tpch-11-15.planner.sql +++ b/optd-sqlplannertest/tests/tpch/tpch-11-15.planner.sql @@ -569,49 +569,49 @@ PhysicalSort ├── exprs:SortOrder { order: Asc } │ └── #0 └── PhysicalProjection { exprs: [ #3, #4, #5, #7, #2 ] } - └── PhysicalHashJoin { join_type: Inner, left_keys: [ #1 ], right_keys: [ #0 ] } - ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #1 ] } - │ ├── PhysicalAgg - │ │ ├── aggrs:Agg(Max) - │ │ │ └── [ #0 ] - │ │ ├── groups: [] - │ │ └── PhysicalProjection { exprs: [ #1 ] } - │ │ └── PhysicalAgg - │ │ ├── aggrs:Agg(Sum) - │ │ │ └── Mul - │ │ │ ├── #1 - │ │ │ └── Sub - │ │ │ ├── 1(float) - │ │ │ └── #2 - │ │ ├── groups: [ #0 ] - │ │ └── PhysicalProjection { exprs: [ #2, #5, #6 ] } - │ │ └── PhysicalFilter - │ │ ├── cond:And - │ │ │ ├── Geq - │ │ │ │ ├── #10 - │ │ │ │ └── 8401(i64) - │ │ │ └── Lt - │ │ │ ├── #10 - │ │ │ └── 8491(i64) - │ │ └── PhysicalScan { table: lineitem } - │ └── PhysicalAgg - │ ├── aggrs:Agg(Sum) - │ │ └── Mul - │ │ ├── #1 - │ │ └── Sub - │ │ ├── 1(float) - │ │ └── #2 - │ ├── groups: [ #0 ] - │ └── PhysicalProjection { exprs: [ #2, #5, #6 ] } - │ └── PhysicalFilter - │ ├── cond:And - │ │ ├── Geq - │ │ │ ├── #10 - │ │ │ └── 8401(i64) - │ │ └── Lt - │ │ ├── #10 - │ │ └── 8491(i64) - │ └── PhysicalScan { table: lineitem } - └── PhysicalScan { table: supplier } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #1 ] } + ├── PhysicalAgg + │ ├── aggrs:Agg(Max) + │ │ └── [ #0 ] + │ ├── groups: [] + │ └── PhysicalProjection { exprs: [ #1 ] } + │ └── PhysicalAgg + │ ├── aggrs:Agg(Sum) + │ │ └── Mul + │ │ ├── #1 + │ │ └── Sub + │ │ ├── 1(float) + │ │ └── #2 + │ ├── groups: [ #0 ] + │ └── PhysicalProjection { exprs: [ #2, #5, #6 ] } + │ └── PhysicalFilter + │ ├── cond:And + │ │ ├── Geq + │ │ │ ├── #10 + │ │ │ └── 8401(i64) + │ │ └── Lt + │ │ ├── #10 + │ │ └── 8491(i64) + │ └── PhysicalScan { table: lineitem } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } + ├── PhysicalAgg + │ ├── aggrs:Agg(Sum) + │ │ └── Mul + │ │ ├── #1 + │ │ └── Sub + │ │ ├── 1(float) + │ │ └── #2 + │ ├── groups: [ #0 ] + │ └── PhysicalProjection { exprs: [ #2, #5, #6 ] } + │ └── PhysicalFilter + │ ├── cond:And + │ │ ├── Geq + │ │ │ ├── #10 + │ │ │ └── 8401(i64) + │ │ └── Lt + │ │ ├── #10 + │ │ └── 8491(i64) + │ └── PhysicalScan { table: lineitem } + └── PhysicalScan { table: supplier } */