Skip to content

Commit

Permalink
Subquery Unnesting Agg NULL case workarounds (#257)
Browse files Browse the repository at this point in the history
- Add an outer join with the deduplicated "left side", and a
corresponding projection node, to pass along NULL values as expected.
- Add a specific workaround in the projection node to case NULL -> 0 if
we have a COUNT(*).
  • Loading branch information
jurplel authored Dec 8, 2024
1 parent 3fd39a8 commit 6696706
Show file tree
Hide file tree
Showing 9 changed files with 492 additions and 232 deletions.
103 changes: 90 additions & 13 deletions optd-datafusion-repr/src/rules/subquery/depjoin_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

use optd_core::nodes::{PlanNodeOrGroup, PredNode};
// TODO: No push past join
// TODO: Sideways information passing??
use datafusion_expr::{AggregateFunction, BuiltinScalarFunction};
use optd_core::nodes::{PlanNodeOrGroup, PredNode, Value};
use optd_core::optimizer::Optimizer;
use optd_core::rules::{Rule, RuleMatcher};

use crate::plan_nodes::{
ArcDfPlanNode, ArcDfPredNode, BinOpPred, BinOpType, ColumnRefPred, ConstantPred, DependentJoin,
DfNodeType, DfPredType, DfReprPlanNode, DfReprPredNode, ExternColumnRefPred, JoinType,
ListPred, LogOpPred, LogOpType, LogicalAgg, LogicalFilter, LogicalJoin, LogicalProjection,
PredExt, RawDependentJoin,
ArcDfPlanNode, ArcDfPredNode, BinOpPred, BinOpType, ColumnRefPred, ConstantPred, ConstantType,
DependentJoin, DfNodeType, DfPredType, DfReprPlanNode, DfReprPredNode, ExternColumnRefPred,
FuncPred, FuncType, JoinType, ListPred, LogOpPred, LogOpType, LogicalAgg, LogicalFilter,
LogicalJoin, LogicalProjection, PredExt, RawDependentJoin,
};
use crate::rules::macros::define_rule;
use crate::OptimizerExt;
Expand Down Expand Up @@ -288,11 +287,8 @@ define_rule!(
/// deduplicated set).
/// For info on why we do the outer join, refer to the Unnesting Arbitrary Queries
/// talk by Mark Raasveldt. The correlated columns are covered in the original paper.
///
/// TODO: the outer join is not implemented yet, so some edge cases won't work.
/// Run SQLite tests to catch these, I guess.
fn apply_dep_join_past_agg(
_optimizer: &impl Optimizer<DfNodeType>,
optimizer: &impl Optimizer<DfNodeType>,
binding: ArcDfPlanNode,
) -> Vec<PlanNodeOrGroup<DfNodeType>> {
let join = DependentJoin::from_plan_node(binding).unwrap();
Expand All @@ -305,6 +301,8 @@ fn apply_dep_join_past_agg(
let groups = agg.groups();
let right = agg.child();

let left_schema_size = optimizer.get_schema_of(left.clone()).len();

// Cross join should always have true cond
assert!(cond == ConstantPred::bool(true).into_pred_node());

Expand Down Expand Up @@ -345,11 +343,90 @@ fn apply_dep_join_past_agg(
);

let new_dep_join =
DependentJoin::new_unchecked(left, right, cond, extern_cols, JoinType::Cross);
DependentJoin::new_unchecked(left.clone(), right, cond, extern_cols, JoinType::Cross);

let new_agg_exprs_size = new_exprs.len();
let new_agg_groups_size = new_groups.len();
let new_agg_schema_size = new_agg_groups_size + new_agg_exprs_size;
let new_agg = LogicalAgg::new(new_dep_join.into_plan_node(), new_exprs, new_groups);

vec![new_agg.into_plan_node().into()]
// Add left outer join above the agg node, joining the deduplicated set
// with the new agg node.

// Both sides will have an agg now, so we want to match the correlated
// columns from the left with those from the right
let outer_join_cond = LogOpPred::new(
LogOpType::And,
correlated_col_indices
.iter()
.enumerate()
.map(|(i, _)| {
assert!(i + left_schema_size < left_schema_size + new_agg_schema_size);
BinOpPred::new(
ColumnRefPred::new(i).into_pred_node(),
// We *prepend* the correlated columns to the groups list,
// so we don't need to take into account the old
// group-by expressions to get the corresponding correlated
// column.
ColumnRefPred::new(left_schema_size + i).into_pred_node(),
BinOpType::Eq,
)
.into_pred_node()
})
.collect(),
);

let new_outer_join = LogicalJoin::new_unchecked(
left,
new_agg.into_plan_node(),
outer_join_cond.into_pred_node(),
JoinType::LeftOuter,
);

// We have to maintain the same schema above outer join as w/o it, but we
// also need to use the groups from the deduplicated left side, and the
// exprs from the new agg node. If we use everything from the new agg,
// we don't maintain nulls as desired.
let outer_join_proj = LogicalProjection::new(
// The meaning is to take everything from the left side, and everything
// from the right side *that is not in the left side*. I am unsure
// of the correctness of this project in every case.
new_outer_join.into_plan_node(),
ListPred::new(
(0..left_schema_size)
.chain(left_schema_size + left_schema_size..left_schema_size + new_agg_schema_size)
.map(|x| {
// Count(*) special case: We want all NULLs to be transformed into 0s.
if x >= left_schema_size + new_agg_groups_size {
// If this node corresponds to an agg function, and
// it's a count(*), apply the workaround
let expr =
exprs.to_vec()[x - left_schema_size - new_agg_groups_size].clone();
if expr.typ == DfPredType::Func(FuncType::Agg(AggregateFunction::Count)) {
let expr_child = expr.child(0).child(0);

if expr_child.typ == DfPredType::Constant(ConstantType::UInt8)
&& expr_child.data == Some(Value::UInt8(1))
{
return FuncPred::new(
FuncType::Scalar(BuiltinScalarFunction::Coalesce),
ListPred::new(vec![
ColumnRefPred::new(x).into_pred_node(),
ConstantPred::int64(0).into_pred_node(),
]),
)
.into_pred_node();
}
}
}

ColumnRefPred::new(x).into_pred_node()
})
.collect(),
),
);

vec![outer_join_proj.into_plan_node().into()]
}

// Heuristics-only rule. If we don't have references to the external columns on the right side,
Expand Down
2 changes: 1 addition & 1 deletion optd-perfbench/src/datafusion_dbms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl DatafusionDBMS {

let batches = df.collect().await?;

let options = FormatOptions::default();
let options = FormatOptions::default().with_null("NULL");

for batch in batches {
let converters = batch
Expand Down
21 changes: 21 additions & 0 deletions optd-sqllogictest/slt/unnest-agg-nulls.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
include _basic_tables.slt.part

# This query has NULL values from the subquery agg. It won't work without the
# outer join fix.
# It also has an out-of-order extern column [#1]
query
select
v1,
v2,
(
select avg(v4)
from t2
where v4 = v2
) as avg_v4
from t1 order by v1;
----
1 100 NULL
2 200 200.0
2 250 250.0
3 300 300.0
3 300 300.0
22 changes: 22 additions & 0 deletions optd-sqllogictest/slt/unnest-count-star.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
include _basic_tables.slt.part

# This query uses a count(*) agg function, with nulls. Nulls should be
# transformed from NULL to 0 when they come from count(*).
# It won't work without the outer join fix + a special case on count(*).
# It also has an out-of-order extern column [#1]
query
select
v1,
v2,
(
select count(*)
from t2
where v4 = v2
) as avg_v4
from t1 order by v1;
----
1 100 0
2 200 1
2 250 1
3 300 1
3 300 1
2 changes: 1 addition & 1 deletion optd-sqllogictest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl DatafusionDBMS {
};

let batches = df.collect().await?;
let options = FormatOptions::default();
let options = FormatOptions::default().with_null("NULL");

for batch in batches {
if types.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion optd-sqlplannertest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl DatafusionDBMS {

let batches = df.collect().await?;

let options = FormatOptions::default();
let options = FormatOptions::default().with_null("NULL");

for batch in batches {
let converters = batch
Expand Down
Loading

0 comments on commit 6696706

Please sign in to comment.