Skip to content

Commit

Permalink
refactor(core): rm option around cost models (#243)
Browse files Browse the repository at this point in the history
`Option` was introduced during a transition time where we thought cost
model can compute the cost solely based on the children cost but it
turned out that we need the optimizer for a few derived logical
properties. We can drop them now.

Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh authored Nov 18, 2024
1 parent f4b62f3 commit 2dd2a31
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 69 deletions.
2 changes: 1 addition & 1 deletion docs/src/cost_model.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub trait CostModel<T: RelNodeTyp>: 'static + Send + Sync {
node: &T,
data: &Option<Value>,
children: &[Cost],
context: Option<RelNodeContext>,
context: RelNodeContext,
) -> Cost;
}
```
Expand Down
10 changes: 5 additions & 5 deletions optd-core/src/cascades/tasks/optimize_inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ impl OptimizeInputsTask {
.iter()
.map(|x| x.expect("child winner should always have statistics?"))
.collect::<Vec<_>>(),
Some(RelNodeContext {
RelNodeContext {
group_id,
expr_id: self.expr_id,
children_group_ids: expr.children.clone(),
}),
Some(optimizer),
},
optimizer,
);
optimizer.update_group_info(
group_id,
Expand Down Expand Up @@ -197,8 +197,8 @@ impl<T: NodeType, M: Memo<T>> Task<T, M> for OptimizeInputsTask {
&expr.typ,
&preds,
&input_statistics_ref,
Some(context.clone()),
Some(optimizer),
context.clone(),
optimizer,
);
let total_cost = cost.sum(&operation_cost, &input_cost);

Expand Down
16 changes: 10 additions & 6 deletions optd-core/src/cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,29 @@ pub struct Statistics(pub Box<dyn std::any::Any + Send + Sync + 'static>);
pub struct Cost(pub Vec<f64>);

pub trait CostModel<T: NodeType, M: Memo<T>>: 'static + Send + Sync {
/// Compute the cost of a single operation
/// Compute the cost of a single operation. `RelNodeContext` might be
/// optional in the future when we implement physical property enforcers.
/// If we have not decided the winner for a child group yet, the statistics
/// for that group will be `None`.
#[allow(clippy::too_many_arguments)]
fn compute_operation_cost(
&self,
node: &T,
predicates: &[ArcPredNode<T>],
children_stats: &[Option<&Statistics>],
context: Option<RelNodeContext>,
optimizer: Option<&CascadesOptimizer<T, M>>,
context: RelNodeContext,
optimizer: &CascadesOptimizer<T, M>,
) -> Cost;

/// Derive the statistics of a single operation
/// Derive the statistics of a single operation. `RelNodeContext` might be
/// optional in the future when we implement physical property enforcers.
fn derive_statistics(
&self,
node: &T,
predicates: &[ArcPredNode<T>],
children_stats: &[&Statistics],
context: Option<RelNodeContext>,
optimizer: Option<&CascadesOptimizer<T, M>>,
context: RelNodeContext,
optimizer: &CascadesOptimizer<T, M>,
) -> Statistics;

fn explain_cost(&self, cost: &Cost) -> String;
Expand Down
62 changes: 21 additions & 41 deletions optd-datafusion-repr-adv-cost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for AdvancedCostModel {
node: &DfNodeType,
predicates: &[ArcDfPredNode],
children_stats: &[Option<&Statistics>],
context: Option<RelNodeContext>,
optimizer: Option<&CascadesOptimizer<DfNodeType>>,
context: RelNodeContext,
optimizer: &CascadesOptimizer<DfNodeType>,
) -> Cost {
self.base_model
.compute_operation_cost(node, predicates, children_stats, context, optimizer)
Expand All @@ -73,11 +73,9 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for AdvancedCostModel {
node: &DfNodeType,
predicates: &[ArcDfPredNode],
children_stats: &[&Statistics],
context: Option<RelNodeContext>,
optimizer: Option<&CascadesOptimizer<DfNodeType>>,
context: RelNodeContext,
optimizer: &CascadesOptimizer<DfNodeType>,
) -> Statistics {
let context = context.as_ref();
let optimizer = optimizer.as_ref();
let row_cnts = children_stats
.iter()
.map(|child| DfCostModel::row_cnt(child))
Expand All @@ -100,12 +98,8 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for AdvancedCostModel {
DfCostModel::stat(row_cnt)
}
DfNodeType::PhysicalFilter => {
let output_schema = optimizer
.unwrap()
.get_schema_of(context.unwrap().group_id.into());
let output_column_ref = optimizer
.unwrap()
.get_column_ref_of(context.unwrap().group_id.into());
let output_schema = optimizer.get_schema_of(context.group_id.into());
let output_column_ref = optimizer.get_column_ref_of(context.group_id.into());
let row_cnt = self.stats.get_filter_row_cnt(
row_cnts[0],
output_schema,
Expand All @@ -115,18 +109,12 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for AdvancedCostModel {
DfCostModel::stat(row_cnt)
}
DfNodeType::PhysicalNestedLoopJoin(join_typ) => {
let output_schema = optimizer
.unwrap()
.get_schema_of(context.unwrap().group_id.into());
let output_column_ref = optimizer
.unwrap()
.get_column_ref_of(context.unwrap().group_id.into());
let left_column_ref = optimizer
.unwrap()
.get_column_ref_of(context.unwrap().children_group_ids[0].into());
let right_column_ref = optimizer
.unwrap()
.get_column_ref_of(context.unwrap().children_group_ids[1].into());
let output_schema = optimizer.get_schema_of(context.group_id.into());
let output_column_ref = optimizer.get_column_ref_of(context.group_id.into());
let left_column_ref =
optimizer.get_column_ref_of(context.children_group_ids[0].into());
let right_column_ref =
optimizer.get_column_ref_of(context.children_group_ids[1].into());
let row_cnt = self.stats.get_nlj_row_cnt(
*join_typ,
row_cnts[0],
Expand All @@ -140,18 +128,12 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for AdvancedCostModel {
DfCostModel::stat(row_cnt)
}
DfNodeType::PhysicalHashJoin(join_typ) => {
let output_schema = optimizer
.unwrap()
.get_schema_of(context.unwrap().group_id.into());
let output_column_ref = optimizer
.unwrap()
.get_column_ref_of(context.unwrap().group_id.into());
let left_column_ref = optimizer
.unwrap()
.get_column_ref_of(context.unwrap().children_group_ids[0].into());
let right_column_ref = optimizer
.unwrap()
.get_column_ref_of(context.unwrap().children_group_ids[1].into());
let output_schema = optimizer.get_schema_of(context.group_id.into());
let output_column_ref = optimizer.get_column_ref_of(context.group_id.into());
let left_column_ref =
optimizer.get_column_ref_of(context.children_group_ids[0].into());
let right_column_ref =
optimizer.get_column_ref_of(context.children_group_ids[1].into());
let row_cnt = self.stats.get_hash_join_row_cnt(
*join_typ,
row_cnts[0],
Expand All @@ -166,9 +148,7 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for AdvancedCostModel {
DfCostModel::stat(row_cnt)
}
DfNodeType::PhysicalAgg => {
let output_column_ref = optimizer
.unwrap()
.get_column_ref_of(context.unwrap().group_id.into());
let output_column_ref = optimizer.get_column_ref_of(context.group_id.into());
let row_cnt = self
.stats
.get_agg_row_cnt(predicates[1].clone(), output_column_ref);
Expand All @@ -178,8 +158,8 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for AdvancedCostModel {
node,
predicates,
children_stats,
context.cloned(),
optimizer.copied(),
context,
optimizer,
),
}
}
Expand Down
14 changes: 6 additions & 8 deletions optd-datafusion-repr/src/cost/adaptive_cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ pub struct AdaptiveCostModel {
}

impl AdaptiveCostModel {
fn get_row_cnt(&self, context: &Option<RelNodeContext>) -> f64 {
fn get_row_cnt(&self, context: &RelNodeContext) -> f64 {
let guard = self.runtime_row_cnt.lock().unwrap();
if let Some((runtime_row_cnt, iter)) =
guard.history.get(&context.as_ref().unwrap().group_id)
{
if let Some((runtime_row_cnt, iter)) = guard.history.get(&context.group_id) {
if *iter + self.decay >= guard.iter_cnt {
return (*runtime_row_cnt).max(1) as f64;
}
Expand Down Expand Up @@ -67,8 +65,8 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for AdaptiveCostModel {
node: &DfNodeType,
predicates: &[ArcDfPredNode],
children: &[Option<&Statistics>],
context: Option<RelNodeContext>,
optimizer: Option<&CascadesOptimizer<DfNodeType>>,
context: RelNodeContext,
optimizer: &CascadesOptimizer<DfNodeType>,
) -> Cost {
if let DfNodeType::PhysicalScan = node {
let row_cnt = self.get_row_cnt(&context);
Expand All @@ -83,8 +81,8 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for AdaptiveCostModel {
node: &DfNodeType,
predicates: &[ArcDfPredNode],
children: &[&Statistics],
context: Option<RelNodeContext>,
optimizer: Option<&CascadesOptimizer<DfNodeType>>,
context: RelNodeContext,
optimizer: &CascadesOptimizer<DfNodeType>,
) -> Statistics {
if let DfNodeType::PhysicalScan = node {
let row_cnt = self.get_row_cnt(&context);
Expand Down
8 changes: 4 additions & 4 deletions optd-datafusion-repr/src/cost/base_cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for DfCostModel {
node: &DfNodeType,
predicates: &[ArcDfPredNode],
children: &[&Statistics],
_context: Option<RelNodeContext>,
_optimizer: Option<&CascadesOptimizer<DfNodeType>>,
_context: RelNodeContext,
_optimizer: &CascadesOptimizer<DfNodeType>,
) -> Statistics {
match node {
DfNodeType::PhysicalScan => {
Expand Down Expand Up @@ -132,8 +132,8 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for DfCostModel {
node: &DfNodeType,
predicates: &[ArcDfPredNode],
children: &[Option<&Statistics>],
_context: Option<RelNodeContext>,
_optimizer: Option<&CascadesOptimizer<DfNodeType>>,
_context: RelNodeContext,
_optimizer: &CascadesOptimizer<DfNodeType>,
) -> Cost {
let row_cnts = children
.iter()
Expand Down
8 changes: 4 additions & 4 deletions optd-datafusion-repr/src/testing/dummy_cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for DummyCostModel {
_: &DfNodeType,
_: &[ArcDfPredNode],
_: &[Option<&Statistics>],
_: Option<RelNodeContext>,
_: Option<&CascadesOptimizer<DfNodeType>>,
_: RelNodeContext,
_: &CascadesOptimizer<DfNodeType>,
) -> Cost {
Cost(vec![1.0])
}
Expand All @@ -31,8 +31,8 @@ impl CostModel<DfNodeType, NaiveMemo<DfNodeType>> for DummyCostModel {
_: &DfNodeType,
_: &[ArcDfPredNode],
_: &[&Statistics],
_: Option<RelNodeContext>,
_: Option<&CascadesOptimizer<DfNodeType>>,
_: RelNodeContext,
_: &CascadesOptimizer<DfNodeType>,
) -> Statistics {
Statistics(Box::new(()))
}
Expand Down

0 comments on commit 2dd2a31

Please sign in to comment.