Skip to content

Commit

Permalink
feat: caching optd stats, 12x speedup on TPC-H SF1 (#132)
Browse files Browse the repository at this point in the history
**Summary**: Now caching the stat objects used by `OptCostModel`,
meaning we don't need to load data into DataFusion after doing it the
first time.

**Demo**:
12x speedup on TPC-H SF1 compared to not caching stats.

Caching everything _except_ optd stats takes 45.6s total.
![Screenshot 2024-03-23 at 16 59
04](https://github.com/cmu-db/optd/assets/20631215/4c199374-e2df-43fb-9eba-f348ea1e275a)

Caching everything, _including_ optd stats, takes 3.9s total.
![Screenshot 2024-03-23 at 16 57
45](https://github.com/cmu-db/optd/assets/20631215/4ef01ae9-c5a9-4fcd-bad9-c52d9a73c147)

**Details**:
* This caching is **disabled by default** to avoid accidentally using
stale stats. I added a CLI arg to enable it.
* The main challenge of this PR was making `PerTableStats` a
serializable object for `serde`.
* The serializability refactor will also help down the line when we want
to **put statistics in the catalog**, since that is fundamentally a
serialization problem too. Having `Box<dyn ...>` would make putting
stats in the catalog more difficult.
* This required a significant refactor of how the `MostCommonValues` and
`Distribution` traits are handled in `OptCostModel`. Instead of having
`Box<dyn ...>` values in `PerColumnStats` which store any object that
implements these traits, I made `PerColumnStats` a templated object.
* The one downside of this refactor is that we can no longer have a
database which uses _different_ data structures for `Distribution` (like
a t-digest for one column, a histogram for another, etc.). I didn't see
this as a big enough reason to not do the refactor because it seems like
a rare thing to do. Additionally, if we really needed to do this, we
could just make an enum that had both types.
  • Loading branch information
wangpatrick57 authored Mar 24, 2024
1 parent 3477898 commit 204758e
Show file tree
Hide file tree
Showing 18 changed files with 281 additions and 199 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions optd-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ ordered-float = "4"
tracing-subscriber = "0.3"
pretty-xmlish = "0.1"
itertools = "0.11"
serde = {version = "1.0", features = ["derive", "rc"]}
31 changes: 28 additions & 3 deletions optd-core/src/rel_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
};

use ordered_float::OrderedFloat;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

use crate::{cascades::GroupId, cost::Cost};

Expand All @@ -27,6 +28,30 @@ pub trait RelNodeTyp:
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct SerializableOrderedF64(pub OrderedFloat<f64>);

impl Serialize for SerializableOrderedF64 {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// Directly serialize the inner f64 value of the OrderedFloat
self.0 .0.serialize(serializer)
}
}

impl<'de> Deserialize<'de> for SerializableOrderedF64 {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
// Deserialize an f64 and wrap it in an OrderedFloat
let float = f64::deserialize(deserializer)?;
Ok(SerializableOrderedF64(OrderedFloat(float)))
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Value {
UInt8(u8),
UInt16(u16),
Expand All @@ -37,7 +62,7 @@ pub enum Value {
Int32(i32),
Int64(i64),
Int128(i128),
Float(OrderedFloat<f64>),
Float(SerializableOrderedF64),
String(Arc<str>),
Bool(bool),
Date32(i32),
Expand All @@ -57,7 +82,7 @@ impl std::fmt::Display for Value {
Self::Int32(x) => write!(f, "{x}"),
Self::Int64(x) => write!(f, "{x}"),
Self::Int128(x) => write!(f, "{x}"),
Self::Float(x) => write!(f, "{x}"),
Self::Float(x) => write!(f, "{}", x.0),
Self::String(x) => write!(f, "\"{x}\""),
Self::Bool(x) => write!(f, "{x}"),
Self::Date32(x) => write!(f, "{x}"),
Expand Down Expand Up @@ -133,7 +158,7 @@ impl Value {

pub fn as_f64(&self) -> f64 {
match self {
Value::Float(i) => **i,
Value::Float(i) => *i.0,
_ => panic!("Value is not an f64"),
}
}
Expand Down
4 changes: 2 additions & 2 deletions optd-datafusion-repr/src/bin/test_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use optd_core::{
rules::{Rule, RuleWrapper},
};
use optd_datafusion_repr::{
cost::{OptCostModel, PerTableStats},
cost::{base_cost::DataFusionPerTableStats, OptCostModel},
plan_nodes::{
BinOpExpr, BinOpType, ColumnRefExpr, ConstantExpr, JoinType, LogicalFilter, LogicalJoin,
LogicalScan, OptRelNode, OptRelNodeTyp, PlanNode,
Expand Down Expand Up @@ -45,7 +45,7 @@ pub fn main() {
Box::new(OptCostModel::new(
[("t1", 1000), ("t2", 100), ("t3", 10000)]
.into_iter()
.map(|(x, y)| (x.to_string(), PerTableStats::new(y, vec![])))
.map(|(x, y)| (x.to_string(), DataFusionPerTableStats::new(y, vec![])))
.collect(),
)),
vec![],
Expand Down
4 changes: 2 additions & 2 deletions optd-datafusion-repr/src/cost.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod adaptive_cost;
mod base_cost;
pub mod adaptive_cost;
pub mod base_cost;
mod stats;

pub use adaptive_cost::{AdaptiveCostModel, RuntimeAdaptionStorage, DEFAULT_DECAY};
Expand Down
23 changes: 14 additions & 9 deletions optd-datafusion-repr/src/cost/adaptive_cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ use optd_core::{
rel_node::{RelNode, Value},
};

use super::base_cost::BaseTableStats;
use super::base_cost::{
BaseTableStats, DataFusionDistribution, DataFusionMostCommonValues, Distribution,
MostCommonValues,
};

pub type RuntimeAdaptionStorage = Arc<Mutex<RuntimeAdaptionStorageInner>>;
pub type DataFusionAdaptiveCostModel =
AdaptiveCostModel<DataFusionMostCommonValues, DataFusionDistribution>;

#[derive(Default, Debug)]
pub struct RuntimeAdaptionStorageInner {
Expand All @@ -22,13 +27,13 @@ pub struct RuntimeAdaptionStorageInner {

pub const DEFAULT_DECAY: usize = 50;

pub struct AdaptiveCostModel {
pub struct AdaptiveCostModel<M: MostCommonValues, D: Distribution> {
runtime_row_cnt: RuntimeAdaptionStorage,
base_model: OptCostModel,
base_model: OptCostModel<M, D>,
decay: usize,
}

impl CostModel<OptRelNodeTyp> for AdaptiveCostModel {
impl<M: MostCommonValues, D: Distribution> CostModel<OptRelNodeTyp> for AdaptiveCostModel<M, D> {
fn explain(&self, cost: &Cost) -> String {
self.base_model.explain(cost)
}
Expand Down Expand Up @@ -56,11 +61,11 @@ impl CostModel<OptRelNodeTyp> for AdaptiveCostModel {
{
if *iter + self.decay >= guard.iter_cnt {
let runtime_row_cnt = (*runtime_row_cnt).max(1) as f64;
return OptCostModel::cost(runtime_row_cnt, 0.0, runtime_row_cnt);
return OptCostModel::<M, D>::cost(runtime_row_cnt, 0.0, runtime_row_cnt);
}
}
}
let (mut row_cnt, compute_cost, io_cost) = OptCostModel::cost_tuple(
let (mut row_cnt, compute_cost, io_cost) = OptCostModel::<M, D>::cost_tuple(
&self
.base_model
.compute_cost(node, data, children, context.clone(), optimizer),
Expand All @@ -74,16 +79,16 @@ impl CostModel<OptRelNodeTyp> for AdaptiveCostModel {
}
}
}
OptCostModel::cost(row_cnt, compute_cost, io_cost)
OptCostModel::<M, D>::cost(row_cnt, compute_cost, io_cost)
}

fn compute_plan_node_cost(&self, node: &RelNode<OptRelNodeTyp>) -> Cost {
self.base_model.compute_plan_node_cost(node)
}
}

impl AdaptiveCostModel {
pub fn new(decay: usize, stats: BaseTableStats) -> Self {
impl<M: MostCommonValues, D: Distribution> AdaptiveCostModel<M, D> {
pub fn new(decay: usize, stats: BaseTableStats<M, D>) -> Self {
Self {
runtime_row_cnt: RuntimeAdaptionStorage::default(),
base_model: OptCostModel::new(stats),
Expand Down
Loading

0 comments on commit 204758e

Please sign in to comment.