Skip to content

Commit

Permalink
Feat: two stage heuristic (#139)
Browse files Browse the repository at this point in the history
This pr adds infer properties for heuristic optimizer in optd core.

It splits the two passes for heuristic and cascades. Now heuristic rules
can be used on the first heuristic optimizer as registering them as
default_heuristic_rule and these rules are able to traverse the whole
plan tree as there are no placeholder in heuristic stage.

The properties are not shared among heuristic pass and cascades pass
though. It is recalculated in cascades.

The pr is for enabling more complicated heuristic rules and one can
register their rules as either heuristic or cascade to test its
usability and performance.

The logic for the rule creation is the same as previous heuristic rule
wrapper:
heuristic rule either return 0 or 1 node. Returning 1 node means that
the rule is successfully applied. Returning 0 node means it fails some
constraints for the rule and heuristic optimizer will use the original
node.

---------

Signed-off-by: AveryQi115 <[email protected]>
  • Loading branch information
AveryQi115 authored Mar 26, 2024
1 parent 91a7879 commit 4a8da7e
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 38 deletions.
70 changes: 62 additions & 8 deletions optd-core/src/heuristics/optimizer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::{collections::HashMap, sync::Arc};

use anyhow::Result;
use itertools::Itertools;
use std::any::Any;

use crate::{
optimizer::Optimizer,
property::PropertyBuilderAny,
rel_node::{RelNode, RelNodeRef, RelNodeTyp},
rules::{Rule, RuleMatcher},
};
Expand All @@ -16,6 +19,8 @@ pub enum ApplyOrder {
pub struct HeuristicsOptimizer<T: RelNodeTyp> {
rules: Arc<[Arc<dyn Rule<T, Self>>]>,
apply_order: ApplyOrder,
property_builders: Arc<[Box<dyn PropertyBuilderAny<T>>]>,
properties: HashMap<RelNodeRef<T>, Arc<[Box<dyn Any + Send + Sync + 'static>]>>,
}

fn match_node<T: RelNodeTyp>(
Expand Down Expand Up @@ -102,10 +107,16 @@ fn match_and_pick<T: RelNodeTyp>(
}

impl<T: RelNodeTyp> HeuristicsOptimizer<T> {
pub fn new_with_rules(rules: Vec<Arc<dyn Rule<T, Self>>>, apply_order: ApplyOrder) -> Self {
pub fn new_with_rules(
rules: Vec<Arc<dyn Rule<T, Self>>>,
apply_order: ApplyOrder,
property_builders: Arc<[Box<dyn PropertyBuilderAny<T>>]>,
) -> Self {
Self {
rules: rules.into(),
apply_order,
property_builders,
properties: HashMap::new(),
}
}

Expand All @@ -122,8 +133,10 @@ impl<T: RelNodeTyp> HeuristicsOptimizer<T> {
let matcher = rule.matcher();
if let Some(picks) = match_and_pick(matcher, root_rel.clone()) {
let mut results = rule.apply(self, picks);
assert_eq!(results.len(), 1);
root_rel = results.remove(0).into();
assert!(results.len() <= 1);
if !results.is_empty() {
root_rel = results.remove(0).into();
}
}
}
Ok(root_rel)
Expand All @@ -141,20 +154,61 @@ impl<T: RelNodeTyp> HeuristicsOptimizer<T> {
}
.into(),
)?;
self.infer_properties(root_rel.clone());
self.properties.insert(
node.clone(),
self.properties.get(&root_rel.clone()).unwrap().clone(),
);
Ok(node)
}
ApplyOrder::TopDown => {
self.infer_properties(root_rel.clone());
let root_rel = self.apply_rules(root_rel)?;
let optimized_children = self.optimize_inputs(&root_rel.children)?;
Ok(RelNode {
let node: Arc<RelNode<T>> = RelNode {
typ: root_rel.typ.clone(),
children: optimized_children,
data: root_rel.data.clone(),
}
.into())
.into();
self.infer_properties(root_rel.clone());
self.properties.insert(
node.clone(),
self.properties.get(&root_rel.clone()).unwrap().clone(),
);
Ok(node)
}
}
}

fn infer_properties(&mut self, root_rel: RelNodeRef<T>) {
if self.properties.contains_key(&root_rel) {
return;
}

let child_properties = root_rel
.children
.iter()
.map(|child| {
self.infer_properties((*child).clone());
self.properties.get(child).unwrap().clone()
})
.collect_vec();
let mut props = Vec::with_capacity(self.property_builders.len());
for (id, builder) in self.property_builders.iter().enumerate() {
let child_properties = child_properties
.iter()
.map(|x| x[id].as_ref() as &dyn std::any::Any)
.collect::<Vec<_>>();
let prop = builder.derive_any(
root_rel.typ.clone(),
root_rel.data.clone(),
child_properties.as_slice(),
);
props.push(prop);
}
self.properties.insert(root_rel.clone(), props.into());
}
}

impl<T: RelNodeTyp> Optimizer<T> for HeuristicsOptimizer<T> {
Expand All @@ -167,8 +221,8 @@ impl<T: RelNodeTyp> Optimizer<T> for HeuristicsOptimizer<T> {
root_rel: RelNodeRef<T>,
idx: usize,
) -> P::Prop {
let _ = root_rel;
let _ = idx;
unimplemented!()
let props = self.properties.get(&root_rel).unwrap();
let prop = props[idx].as_ref();
prop.downcast_ref::<P::Prop>().unwrap().clone()
}
}
11 changes: 8 additions & 3 deletions optd-datafusion-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl OptdQueryPlanner {
optimizer_name: "datafusion".to_string(),
}));
}
let optd_rel = ctx.conv_into_optd(logical_plan)?;
let mut optd_rel = ctx.conv_into_optd(logical_plan)?;
if let Some(explains) = &mut explains {
explains.push(StringifiedPlan::new(
PlanType::OptimizedLogicalPlan {
Expand All @@ -230,7 +230,12 @@ impl OptdQueryPlanner {
));
}
let mut optimizer = self.optimizer.lock().unwrap().take().unwrap();
let (group_id, optimized_rel, meta) = optimizer.optimize(optd_rel)?;

if optimizer.is_heuristic_enabled() {
optd_rel = optimizer.heuristic_optimize(optd_rel);
}

let (group_id, optimized_rel, meta) = optimizer.cascades_optimize(optd_rel)?;

if let Some(explains) = &mut explains {
explains.push(StringifiedPlan::new(
Expand All @@ -253,7 +258,7 @@ impl OptdQueryPlanner {
},
));
let bindings = optimizer
.optd_optimizer()
.optd_cascades_optimizer()
.get_all_group_bindings(group_id, true);
let mut join_orders = BTreeSet::new();
let mut logical_join_orders = BTreeSet::new();
Expand Down
1 change: 1 addition & 0 deletions optd-datafusion-repr/src/bin/test_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub fn main() {
Arc::new(HashJoinRule::new()),
],
optd_core::heuristics::ApplyOrder::BottomUp,
Arc::new([]),
);
let node = optimizer.optimize(fnal.0.into_rel_node()).unwrap();
println!(
Expand Down
101 changes: 74 additions & 27 deletions optd-datafusion-repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use cost::{
};
use optd_core::{
cascades::{CascadesOptimizer, GroupId, OptimizerProperties},
heuristics::{ApplyOrder, HeuristicsOptimizer},
optimizer::Optimizer,
property::PropertyBuilderAny,
rel_node::RelNodeMetaMap,
rules::RuleWrapper,
rules::{Rule, RuleWrapper},
};

use plan_nodes::{OptRelNodeRef, OptRelNodeTyp};
Expand All @@ -33,9 +36,11 @@ pub mod properties;
pub mod rules;

pub struct DatafusionOptimizer {
optimizer: CascadesOptimizer<OptRelNodeTyp>,
hueristic_optimizer: HeuristicsOptimizer<OptRelNodeTyp>,
cascades_optimizer: CascadesOptimizer<OptRelNodeTyp>,
pub runtime_statistics: RuntimeAdaptionStorage,
enable_adaptive: bool,
enable_heuristic: bool,
}

impl DatafusionOptimizer {
Expand All @@ -47,26 +52,43 @@ impl DatafusionOptimizer {
self.enable_adaptive
}

pub fn optd_optimizer(&self) -> &CascadesOptimizer<OptRelNodeTyp> {
&self.optimizer
pub fn enable_heuristic(&mut self, enable: bool) {
self.enable_heuristic = enable;
}

pub fn is_heuristic_enabled(&self) -> bool {
self.enable_heuristic
}

pub fn optd_cascades_optimizer(&self) -> &CascadesOptimizer<OptRelNodeTyp> {
&self.cascades_optimizer
}

pub fn optd_hueristic_optimizer(&self) -> &HeuristicsOptimizer<OptRelNodeTyp> {
&self.hueristic_optimizer
}

pub fn optd_optimizer_mut(&mut self) -> &mut CascadesOptimizer<OptRelNodeTyp> {
&mut self.optimizer
&mut self.cascades_optimizer
}

pub fn default_rules() -> Vec<Arc<RuleWrapper<OptRelNodeTyp, CascadesOptimizer<OptRelNodeTyp>>>>
{
pub fn default_heuristic_rules(
) -> Vec<Arc<dyn Rule<OptRelNodeTyp, HeuristicsOptimizer<OptRelNodeTyp>>>> {
vec![
Arc::new(SimplifyFilterRule::new()),
Arc::new(SimplifyJoinCondRule::new()),
Arc::new(EliminateFilterRule::new()),
Arc::new(EliminateJoinRule::new()),
Arc::new(EliminateLimitRule::new()),
Arc::new(EliminateDuplicatedSortExprRule::new()),
Arc::new(EliminateDuplicatedAggExprRule::new()),
]
}

pub fn default_cascades_rules(
) -> Vec<Arc<RuleWrapper<OptRelNodeTyp, CascadesOptimizer<OptRelNodeTyp>>>> {
let rules = PhysicalConversionRule::all_conversions();
let mut rule_wrappers = vec![
RuleWrapper::new_heuristic(Arc::new(SimplifyFilterRule::new())),
RuleWrapper::new_heuristic(Arc::new(SimplifyJoinCondRule::new())),
RuleWrapper::new_heuristic(Arc::new(EliminateFilterRule::new())),
RuleWrapper::new_heuristic(Arc::new(EliminateJoinRule::new())),
RuleWrapper::new_heuristic(Arc::new(EliminateLimitRule::new())),
RuleWrapper::new_heuristic(Arc::new(EliminateDuplicatedSortExprRule::new())),
RuleWrapper::new_heuristic(Arc::new(EliminateDuplicatedAggExprRule::new())),
];
let mut rule_wrappers = vec![];
for rule in rules {
rule_wrappers.push(RuleWrapper::new_cascades(rule));
}
Expand All @@ -86,23 +108,34 @@ impl DatafusionOptimizer {
stats: DataFusionBaseTableStats,
enable_adaptive: bool,
) -> Self {
let rules = Self::default_rules();
let cascades_rules = Self::default_cascades_rules();
let heuristic_rules = Self::default_heuristic_rules();
let property_builders: Arc<[Box<dyn PropertyBuilderAny<OptRelNodeTyp>>]> = Arc::new([
Box::new(SchemaPropertyBuilder::new(catalog.clone())),
Box::new(ColumnRefPropertyBuilder::new(catalog.clone())),
]);
let cost_model = AdaptiveCostModel::new(DEFAULT_DECAY, stats);
Self {
runtime_statistics: cost_model.get_runtime_map(),
optimizer: CascadesOptimizer::new_with_prop(
rules,
cascades_optimizer: CascadesOptimizer::new_with_prop(
cascades_rules,
Box::new(cost_model),
vec![
Box::new(SchemaPropertyBuilder::new(catalog.clone())),
Box::new(ColumnRefPropertyBuilder::new(catalog)),
Box::new(ColumnRefPropertyBuilder::new(catalog.clone())),
],
OptimizerProperties {
partial_explore_iter: Some(1 << 20),
partial_explore_space: Some(1 << 10),
},
),
hueristic_optimizer: HeuristicsOptimizer::new_with_rules(
heuristic_rules,
ApplyOrder::BottomUp,
property_builders.clone(),
),
enable_adaptive,
enable_heuristic: true,
}
}

Expand Down Expand Up @@ -140,31 +173,45 @@ impl DatafusionOptimizer {
);
Self {
runtime_statistics,
optimizer,
cascades_optimizer: optimizer,
enable_adaptive: true,
enable_heuristic: false,
hueristic_optimizer: HeuristicsOptimizer::new_with_rules(
vec![],
ApplyOrder::BottomUp,
Arc::new([]),
),
}
}

pub fn optimize(
pub fn heuristic_optimize(&mut self, root_rel: OptRelNodeRef) -> OptRelNodeRef {
self.hueristic_optimizer
.optimize(root_rel)
.expect("heuristics returns error")
}

pub fn cascades_optimize(
&mut self,
root_rel: OptRelNodeRef,
) -> Result<(GroupId, OptRelNodeRef, RelNodeMetaMap)> {
if self.enable_adaptive {
self.runtime_statistics.lock().unwrap().iter_cnt += 1;
self.optimizer.step_clear_winner();
self.cascades_optimizer.step_clear_winner();
} else {
self.optimizer.step_clear();
self.cascades_optimizer.step_clear();
}

let group_id = self.optimizer.step_optimize_rel(root_rel)?;
let group_id = self.cascades_optimizer.step_optimize_rel(root_rel)?;

let mut meta = Some(HashMap::new());
let optimized_rel = self.optimizer.step_get_optimize_rel(group_id, &mut meta)?;
let optimized_rel = self
.cascades_optimizer
.step_get_optimize_rel(group_id, &mut meta)?;

Ok((group_id, optimized_rel, meta.unwrap()))
}

pub fn dump(&self, group_id: Option<GroupId>) {
self.optimizer.dump(group_id)
self.cascades_optimizer.dump(group_id)
}
}

0 comments on commit 4a8da7e

Please sign in to comment.