Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subquery Unnesting Agg NULL case workarounds #257

Merged
merged 29 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
99ed87e
Add all tpch queries (from risinglightdb tests)
jurplel Dec 6, 2024
5e4f33a
Newline normalization
jurplel Dec 6, 2024
298ab67
Fix BinOp schema property issue
jurplel Dec 6, 2024
6547052
tpch q11 fix
jurplel Dec 6, 2024
b62ce72
Merge branch 'bowad/tpch-q11-fix' of github.com:cmu-db/optd into bowa…
jurplel Dec 6, 2024
768982d
Update sqlplannertest plans
jurplel Dec 6, 2024
52359fa
Merge branch 'main' into bowad/tpch-q11-fix
jurplel Dec 6, 2024
01774d1
Delete q11 again
jurplel Dec 6, 2024
4c2c7fd
fix a couple of depjoin agg pushdown bugs
jurplel Dec 6, 2024
66a310d
un-disable tpch-q17
jurplel Dec 6, 2024
21d490a
Fix another bug w/ init distinct
jurplel Dec 6, 2024
f4108b3
Write comment for init distinct fix
jurplel Dec 6, 2024
a945abf
Merge
jurplel Dec 6, 2024
f055605
Update sqlplannertest plans
jurplel Dec 6, 2024
0dcce3d
Add test for out-of-order extern columns in subquery
jurplel Dec 6, 2024
b468692
add unnest test w/ nulls from agg
jurplel Dec 6, 2024
78c2b5e
Implement outer join agg null fix
jurplel Dec 6, 2024
9edf1af
Count(*) fix
jurplel Dec 7, 2024
bab134f
Merge branch 'main' into bowad/unnest-agg-null-fix
jurplel Dec 7, 2024
b0b9e4e
planner test updates
jurplel Dec 7, 2024
9778e25
clippy
jurplel Dec 7, 2024
87c9c09
Unused variable
jurplel Dec 7, 2024
c2c908e
Fix NULL not printing
jurplel Dec 7, 2024
b9c70ee
merge w/ main (and tests are failing)
jurplel Dec 7, 2024
0fc8cb9
Fix not passing all columns through (I think this was a bug?)
jurplel Dec 7, 2024
c2082da
clippy
jurplel Dec 7, 2024
34d6d88
Add comment to projection
jurplel Dec 8, 2024
e8b1d5b
Remove extraneous sqlplannertest result
jurplel Dec 8, 2024
b7a18f4
Grammar fix in test files
jurplel Dec 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 90 additions & 13 deletions optd-datafusion-repr/src/rules/subquery/depjoin_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

use optd_core::nodes::{PlanNodeOrGroup, PredNode};
// TODO: No push past join
// TODO: Sideways information passing??
use datafusion_expr::{AggregateFunction, BuiltinScalarFunction};
use optd_core::nodes::{PlanNodeOrGroup, PredNode, Value};
use optd_core::optimizer::Optimizer;
use optd_core::rules::{Rule, RuleMatcher};

use crate::plan_nodes::{
ArcDfPlanNode, ArcDfPredNode, BinOpPred, BinOpType, ColumnRefPred, ConstantPred, DependentJoin,
DfNodeType, DfPredType, DfReprPlanNode, DfReprPredNode, ExternColumnRefPred, JoinType,
ListPred, LogOpPred, LogOpType, LogicalAgg, LogicalFilter, LogicalJoin, LogicalProjection,
PredExt, RawDependentJoin,
ArcDfPlanNode, ArcDfPredNode, BinOpPred, BinOpType, ColumnRefPred, ConstantPred, ConstantType,
DependentJoin, DfNodeType, DfPredType, DfReprPlanNode, DfReprPredNode, ExternColumnRefPred,
FuncPred, FuncType, JoinType, ListPred, LogOpPred, LogOpType, LogicalAgg, LogicalFilter,
LogicalJoin, LogicalProjection, PredExt, RawDependentJoin,
};
use crate::rules::macros::define_rule;
use crate::OptimizerExt;
Expand Down Expand Up @@ -288,11 +287,8 @@ define_rule!(
/// deduplicated set).
/// For info on why we do the outer join, refer to the Unnesting Arbitrary Queries
/// talk by Mark Raasveldt. The correlated columns are covered in the original paper.
///
/// TODO: the outer join is not implemented yet, so some edge cases won't work.
/// Run SQLite tests to catch these, I guess.
fn apply_dep_join_past_agg(
_optimizer: &impl Optimizer<DfNodeType>,
optimizer: &impl Optimizer<DfNodeType>,
binding: ArcDfPlanNode,
) -> Vec<PlanNodeOrGroup<DfNodeType>> {
let join = DependentJoin::from_plan_node(binding).unwrap();
Expand All @@ -305,6 +301,8 @@ fn apply_dep_join_past_agg(
let groups = agg.groups();
let right = agg.child();

let left_schema_size = optimizer.get_schema_of(left.clone()).len();

// Cross join should always have true cond
assert!(cond == ConstantPred::bool(true).into_pred_node());

Expand Down Expand Up @@ -345,11 +343,90 @@ fn apply_dep_join_past_agg(
);

let new_dep_join =
DependentJoin::new_unchecked(left, right, cond, extern_cols, JoinType::Cross);
DependentJoin::new_unchecked(left.clone(), right, cond, extern_cols, JoinType::Cross);

let new_agg_exprs_size = new_exprs.len();
let new_agg_groups_size = new_groups.len();
let new_agg_schema_size = new_agg_groups_size + new_agg_exprs_size;
let new_agg = LogicalAgg::new(new_dep_join.into_plan_node(), new_exprs, new_groups);

vec![new_agg.into_plan_node().into()]
// Add left outer join above the agg node, joining the deduplicated set
// with the new agg node.

// Both sides will have an agg now, so we want to match the correlated
// columns from the left with those from the right
let outer_join_cond = LogOpPred::new(
LogOpType::And,
correlated_col_indices
.iter()
.enumerate()
.map(|(i, _)| {
assert!(i + left_schema_size < left_schema_size + new_agg_schema_size);
BinOpPred::new(
ColumnRefPred::new(i).into_pred_node(),
// We *prepend* the correlated columns to the groups list,
// so we don't need to take into account the old
// group-by expressions to get the corresponding correlated
// column.
ColumnRefPred::new(left_schema_size + i).into_pred_node(),
BinOpType::Eq,
)
.into_pred_node()
})
.collect(),
);

let new_outer_join = LogicalJoin::new_unchecked(
left,
new_agg.into_plan_node(),
outer_join_cond.into_pred_node(),
JoinType::LeftOuter,
);

// We have to maintain the same schema above outer join as w/o it, but we
// also need to use the groups from the deduplicated left side, and the
// exprs from the new agg node. If we use everything from the new agg,
// we don't maintain nulls as desired.
let outer_join_proj = LogicalProjection::new(
// The meaning is to take everything from the left side, and everything
// from the right side *that is not in the left side*. I am unsure
// of the correctness of this project in every case.
new_outer_join.into_plan_node(),
ListPred::new(
(0..left_schema_size)
.chain(left_schema_size + left_schema_size..left_schema_size + new_agg_schema_size)
Copy link
Member Author

@jurplel jurplel Dec 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one thing I am not sure of the correctness of.

The meaning is to take:

  • Everything from the left side
  • Everything from the right side that is not in the left side.

I found this to be correct in all the tests that I wrote.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything from the right side that is not in the left side.

Maybe we need to figure out the exact meaning of this -- do you want to know if two columns refer to the same table-column?

Copy link
Member Author

@jurplel jurplel Dec 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so after thinking about it a bit longer, my description is correct, but it is a bit easier to think about than that:

The right side is going to be the subquery on the right, and the deduplicated part on the left.

The left side is going to be a copy of the deduplicated part that is the left child of the right child.

So, our projection will be the deduplicated copy (that will be outer joined to resolve the null problems) and the non-deduplicated part from the right side.

.map(|x| {
// Count(*) special case: We want all NULLs to be transformed into 0s.
if x >= left_schema_size + new_agg_groups_size {
// If this node corresponds to an agg function, and
// it's a count(*), apply the workaround
let expr =
exprs.to_vec()[x - left_schema_size - new_agg_groups_size].clone();
if expr.typ == DfPredType::Func(FuncType::Agg(AggregateFunction::Count)) {
let expr_child = expr.child(0).child(0);

if expr_child.typ == DfPredType::Constant(ConstantType::UInt8)
&& expr_child.data == Some(Value::UInt8(1))
{
return FuncPred::new(
FuncType::Scalar(BuiltinScalarFunction::Coalesce),
ListPred::new(vec![
ColumnRefPred::new(x).into_pred_node(),
ConstantPred::int64(0).into_pred_node(),
]),
)
.into_pred_node();
}
}
}

ColumnRefPred::new(x).into_pred_node()
})
.collect(),
),
);

vec![outer_join_proj.into_plan_node().into()]
}

// Heuristics-only rule. If we don't have references to the external columns on the right side,
Expand Down
2 changes: 1 addition & 1 deletion optd-perfbench/src/datafusion_dbms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl DatafusionDBMS {

let batches = df.collect().await?;

let options = FormatOptions::default();
let options = FormatOptions::default().with_null("NULL");

for batch in batches {
let converters = batch
Expand Down
21 changes: 21 additions & 0 deletions optd-sqllogictest/slt/unnest-agg-nulls.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
include _basic_tables.slt.part

# This query has NULL values from the subquery agg. It won't work without the
# outer join fix.
# It also has an out-of-order extern column [#1]
query
select
v1,
v2,
(
select avg(v4)
from t2
where v4 = v2
) as avg_v4
from t1 order by v1;
----
1 100 NULL
2 200 200.0
2 250 250.0
3 300 300.0
3 300 300.0
22 changes: 22 additions & 0 deletions optd-sqllogictest/slt/unnest-count-star.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
include _basic_tables.slt.part

# This query uses a count(*) agg function, with nulls. Nulls should be
# transformed from NULL to 0 when they come from count(*).
# It won't work without the outer join fix + a special case on count(*).
# It also has an out-of-order extern column [#1]
query
select
v1,
v2,
(
select count(*)
from t2
where v4 = v2
) as avg_v4
from t1 order by v1;
----
1 100 0
2 200 1
2 250 1
3 300 1
3 300 1
2 changes: 1 addition & 1 deletion optd-sqllogictest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl DatafusionDBMS {
};

let batches = df.collect().await?;
let options = FormatOptions::default();
let options = FormatOptions::default().with_null("NULL");

for batch in batches {
if types.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion optd-sqlplannertest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl DatafusionDBMS {

let batches = df.collect().await?;

let options = FormatOptions::default();
let options = FormatOptions::default().with_null("NULL");

for batch in batches {
let converters = batch
Expand Down
Loading
Loading