Skip to content

Commit

Permalink
eliminate sort by physical properties
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh committed Nov 15, 2024
1 parent 1a8e723 commit 2098a72
Show file tree
Hide file tree
Showing 22 changed files with 546 additions and 349 deletions.
17 changes: 9 additions & 8 deletions optd-core/src/cascades/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,14 +502,15 @@ impl<T: NodeType> Memo<T> for NaiveMemo<T> {
..
}) = &winner
{
let WinnerExpr::Expr { expr_id } = expr_id else {
unimplemented!()
};
assert!(
*total_weighted_cost != 0.0,
"{}",
self.expr_id_to_expr_node[expr_id]
);
if let WinnerExpr::Expr { expr_id } = expr_id {
assert!(
*total_weighted_cost != 0.0,
"{}",
self.expr_id_to_expr_node[expr_id]
);
} else {
assert!(*total_weighted_cost != 0.0);
}
}
group.winners.get_mut(&subgroup_id).unwrap().1 = winner;
}
Expand Down
53 changes: 26 additions & 27 deletions optd-core/src/cascades/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {

pub fn dump(&self, mut buf: impl std::fmt::Write) -> std::fmt::Result {
for group_id in self.memo.get_all_group_ids() {
writeln!(buf, "group_id={}", group_id)?;
for subgroup_id in self.memo.get_all_subgroup_ids(group_id) {
let winner_str = match &self.memo.get_group_winner(group_id, subgroup_id) {
Winner::Impossible => "winner=<impossible>".to_string(),
Expand All @@ -213,10 +214,12 @@ impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
WinnerExpr::Propagate { group_id } => {
(format!("{}", group_id), format!("{}", group_id))
}
WinnerExpr::Enforcer {} => ("enforcer".to_string(), "".to_string()),
WinnerExpr::Enforcer {} => {
("enforcer".to_string(), "enforcer".to_string())
}
};
format!(
"winner={} weighted_cost={} | {}\n cost={}\n stat={}",
"winner={} weighted_cost={} | {}\n cost={}\n stat={}",
winner_expr,
winner.total_weighted_cost,
winner_str,
Expand All @@ -225,11 +228,7 @@ impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
)
}
};
writeln!(
buf,
"group_id={} subgroup_id={} {}",
group_id, subgroup_id, winner_str
)?;
writeln!(buf, " subgroup_id={} {}", subgroup_id, winner_str)?;
let goal = self.memo.get_subgroup_goal(group_id, subgroup_id);
for (id, property) in self
.memo
Expand All @@ -238,28 +237,28 @@ impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
.iter()
.enumerate()
{
writeln!(buf, " {}={}", property.property_name(), goal[id])?;
}
let group = self.memo.get_group(group_id);
for (id, property) in self.logical_property_builders.iter().enumerate() {
writeln!(
buf,
" {}={}",
property.property_name(),
group.logical_properties[id]
)?;
}
let mut all_predicates = BTreeSet::new();
for expr_id in self.memo.get_all_exprs_in_group(group_id) {
let memo_node = self.memo.get_expr_memoed(expr_id);
for pred in &memo_node.predicates {
all_predicates.insert(*pred);
}
writeln!(buf, " expr_id={} | {}", expr_id, memo_node)?;
writeln!(buf, " {}={}", property.property_name(), goal[id])?;
}
for pred in all_predicates {
writeln!(buf, " {}={}", pred, self.memo.get_pred(pred))?;
}
let group = self.memo.get_group(group_id);
for (id, property) in self.logical_property_builders.iter().enumerate() {
writeln!(
buf,
" {}={}",
property.property_name(),
group.logical_properties[id]
)?;
}
let mut all_predicates = BTreeSet::new();
for expr_id in self.memo.get_all_exprs_in_group(group_id) {
let memo_node = self.memo.get_expr_memoed(expr_id);
for pred in &memo_node.predicates {
all_predicates.insert(*pred);
}
writeln!(buf, " expr_id={} | {}", expr_id, memo_node)?;
}
for pred in all_predicates {
writeln!(buf, " {}={}", pred, self.memo.get_pred(pred))?;
}
}
Ok(())
Expand Down
2 changes: 2 additions & 0 deletions optd-core/src/cascades/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ mod apply_rule;
mod explore_group;
mod optimize_expression;
mod optimize_group;
mod optimize_input_finalize;
mod optimize_inputs;

pub use apply_rule::ApplyRuleTask;
pub use explore_group::ExploreGroupTask;
pub use optimize_expression::OptimizeExpressionTask;
pub use optimize_group::OptimizeGroupTask;
pub use optimize_input_finalize::OptimizeInputFinalizeTask;
pub use optimize_inputs::OptimizeInputsTask;

pub trait Task<T: NodeType, M: Memo<T>>: 'static + Send + Sync {
Expand Down
1 change: 1 addition & 0 deletions optd-core/src/cascades/tasks/apply_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ impl<T: NodeType, M: Memo<T>> Task<T, M> for ApplyRuleTask {
)) as Box<dyn Task<T, M>>);
} else {
tasks.push(Box::new(OptimizeInputsTask::new(
group_id,
expr_id,
!optimizer.prop.disable_pruning,
self.subgroup_id,
Expand Down
43 changes: 33 additions & 10 deletions optd-core/src/cascades/tasks/optimize_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
// https://opensource.org/licenses/MIT.

use anyhow::Result;
use itertools::Itertools;
use tracing::trace;

use super::Task;
use crate::cascades::optimizer::GroupId;
use crate::cascades::tasks::optimize_expression::OptimizeExpressionTask;
use crate::cascades::tasks::OptimizeInputsTask;
use crate::cascades::tasks::{OptimizeInputFinalizeTask, OptimizeInputsTask};
use crate::cascades::{CascadesOptimizer, Memo, SubGroupId};
use crate::nodes::NodeType;

Expand Down Expand Up @@ -37,6 +38,12 @@ impl<T: NodeType, M: Memo<T>> Task<T, M> for OptimizeGroupTask {
}
let exprs = optimizer.get_all_exprs_in_group(self.group_id);
let mut tasks = vec![];

tasks.push(Box::new(OptimizeInputFinalizeTask::new(
self.group_id,
self.subgroup_id,
)) as Box<dyn Task<T, M>>);

let exprs_cnt = exprs.len();
for &expr in &exprs {
let typ = optimizer.get_expr_memoed(expr).typ.clone();
Expand All @@ -47,15 +54,30 @@ impl<T: NodeType, M: Memo<T>> Task<T, M> for OptimizeGroupTask {
);
}
}
// optimize with the required properties
for &expr in &exprs {
let typ = optimizer.get_expr_memoed(expr).typ.clone();
if !typ.is_logical() {
tasks.push(Box::new(OptimizeInputsTask::new(
expr,
!optimizer.prop.disable_pruning,
self.subgroup_id,
)) as Box<dyn Task<T, M>>);
let goal = optimizer
.memo()
.get_subgroup_goal(self.group_id, self.subgroup_id);
// optimize with the required properties, only do optimize_input for those nodes that can pass through
for &expr_id in &exprs {
let expr = optimizer.get_expr_memoed(expr_id);
if !expr.typ.is_logical() {
let predicates = expr
.predicates
.iter()
.map(|pred_id| optimizer.get_pred(*pred_id))
.collect_vec();
if optimizer
.memo()
.get_physical_property_builders()
.can_passthrough_any_many(expr.typ.clone(), &predicates, &goal)
{
tasks.push(Box::new(OptimizeInputsTask::new(
self.group_id,
expr_id,
!optimizer.prop.disable_pruning,
self.subgroup_id,
)) as Box<dyn Task<T, M>>);
}
}
}
// optimize with default properties
Expand All @@ -68,6 +90,7 @@ impl<T: NodeType, M: Memo<T>> Task<T, M> for OptimizeGroupTask {
let typ = optimizer.get_expr_memoed(expr).typ.clone();
if !typ.is_logical() {
tasks.push(Box::new(OptimizeInputsTask::new(
self.group_id,
expr,
!optimizer.prop.disable_pruning,
default_goal,
Expand Down
137 changes: 137 additions & 0 deletions optd-core/src/cascades/tasks/optimize_input_finalize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright (c) 2023-2024 CMU Database Group
//
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

use std::sync::Arc;

use anyhow::Result;
use tracing::trace;

use super::Task;
use crate::cascades::memo::{Winner, WinnerExpr, WinnerInfo};
use crate::cascades::{CascadesOptimizer, GroupId, Memo, SubGroupId};
use crate::cost::{Cost, Statistics};
use crate::nodes::NodeType;

/// If optimize group cannot find any expression that can always pass through the required properties, this task will
/// update the winner of the group as enforcer, and compute the cost of the enforcer.
pub struct OptimizeInputFinalizeTask {
group_id: GroupId,
subgroup_id: SubGroupId,
}

impl OptimizeInputFinalizeTask {
pub fn new(group_id: GroupId, subgroup_id: SubGroupId) -> Self {
Self {
group_id,
subgroup_id,
}
}

fn update_winner_impossible<T: NodeType, M: Memo<T>>(
&self,
optimizer: &mut CascadesOptimizer<T, M>,
) {
if let Winner::Unknown = optimizer.get_group_winner(self.group_id, self.subgroup_id) {
optimizer.update_group_winner(self.group_id, self.subgroup_id, Winner::Impossible);
}
}

fn update_winner<T: NodeType, M: Memo<T>>(
&self,
statistics: Arc<Statistics>,
operation_cost: Cost,
total_cost: Cost,
optimizer: &mut CascadesOptimizer<T, M>,
) {
let winner = optimizer.get_group_winner(self.group_id, self.subgroup_id);
let cost = optimizer.cost();
let operation_weighted_cost = cost.weighted_cost(&operation_cost);
let total_weighted_cost = cost.weighted_cost(&total_cost);
let mut update_cost = false;
if let Some(winner) = winner.as_full_winner() {
if winner.total_weighted_cost > total_weighted_cost {
update_cost = true;
}
} else {
update_cost = true;
}
if update_cost {
optimizer.update_group_winner(
self.group_id,
self.subgroup_id,
Winner::Full(WinnerInfo {
expr_id: WinnerExpr::Enforcer {},
total_weighted_cost,
operation_weighted_cost,
total_cost,
operation_cost,
statistics: statistics.into(),
}),
);
}
}
}

impl<T: NodeType, M: Memo<T>> Task<T, M> for OptimizeInputFinalizeTask {
fn execute(&self, optimizer: &mut CascadesOptimizer<T, M>) -> Result<Vec<Box<dyn Task<T, M>>>> {
trace!(event = "task_begin", task = "optimize_input_finalize", subgroup_id = %self.subgroup_id);
let default_goal = optimizer
.memo()
.get_physical_property_builders()
.default_many();
let default_goal_subgroup =
optimizer.create_or_get_subgroup(self.group_id, default_goal.into());
let cost = optimizer.cost();
// check if we have a winner for the default goal subgroup
match optimizer.get_group_winner(self.group_id, default_goal_subgroup) {
Winner::Full(winner) => {
// compute the cost of enforce the property
let goal = optimizer
.memo()
.get_subgroup_goal(self.group_id, self.subgroup_id);
let statistics = winner.statistics.clone();
let mut operation_cost = winner.operation_cost.clone();
let mut total_cost = winner.total_cost.clone();
for (idx, builder) in optimizer
.memo()
.get_physical_property_builders()
.0
.iter()
.enumerate()
{
let (typ, predicates) = builder.enforce_any(goal[idx].as_ref());
// assume statistics doesn't change when applying enforcers
let op_cost = cost.compute_operation_cost(
&typ,
&predicates,
&[Some(&statistics)],
None, // enforcers are not part of the memo table, and therefore there's no context
Some(optimizer),
);
cost.accumulate(&mut operation_cost, &op_cost);
cost.accumulate(&mut total_cost, &op_cost);
}
trace!(
event = "compute_cost",
task = "optimize_input_finalize",
subgroup_id = %self.subgroup_id,
weighted_cost_so_far = cost.weighted_cost(&total_cost));
self.update_winner(statistics, operation_cost, total_cost, optimizer);
}
Winner::Impossible => {
self.update_winner_impossible(optimizer);
}
Winner::Unknown => {
unreachable!("winner should be known at this point")
}
}
trace!(event = "task_end", task = "optimize_input_finalize", subgroup_id = %self.subgroup_id);
Ok(Vec::new())
}

fn describe(&self) -> String {
format!("optimize_inputs_finalize {}", self.subgroup_id)
}
}
Loading

0 comments on commit 2098a72

Please sign in to comment.