Skip to content

Commit

Permalink
Fixes for TPC-H Q17 (#255)
Browse files Browse the repository at this point in the history
Two bugs:
- `dep_initial_distinct` did not work if the correlated columns were not
`[#0, #1, #2]` and so on.
- `dep_join_past_agg` was pushing the actual correlated column number
instead of the *index* of the correlated column number. This works in
the same cases the above bug did (`[#0, #1, #2]` correlated columns),
and so it broke for the same reason afaict.
  • Loading branch information
jurplel authored Dec 6, 2024
1 parent 228e7ba commit 93aa6be
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 22 deletions.
40 changes: 24 additions & 16 deletions optd-datafusion-repr/src/rules/subquery/depjoin_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,21 @@ fn apply_dep_initial_distinct(

// Our join condition is going to make sure that all of the correlated columns
// in the right side are equal to their equivalent columns in the left side.
// (they will have the same index, just shifted over)
//
// If we have correlated columns [#16, #17], we want our condition to be:
// #16 = #0 AND #17 = #1
//
// This is because the aggregate we install on the right side will map the
// correlated columns to their respective indices as shown.
let join_cond = LogOpPred::new(
LogOpType::And,
(0..correlated_col_indices.len())
.map(|i| {
correlated_col_indices
.iter()
.enumerate()
.map(|(i, x)| {
assert!(i + left_schema_size < left_schema_size + right_schema_size);
BinOpPred::new(
ColumnRefPred::new(i).into_pred_node(),
ColumnRefPred::new(*x).into_pred_node(),
ColumnRefPred::new(i + left_schema_size).into_pred_node(),
BinOpType::Eq,
)
Expand Down Expand Up @@ -177,16 +184,18 @@ fn apply_dep_join_past_proj(
let cond = join.cond();
let extern_cols = join.extern_cols();
let proj = LogicalProjection::from_plan_node(right.unwrap_plan_node()).unwrap();
let proj_exprs = proj.exprs();
let right = proj.child();

// TODO: can we have external columns in projection node? I don't think so?
// Cross join should always have true cond
assert!(cond == ConstantPred::bool(true).into_pred_node());
let left_schema_len = optimizer.get_schema_of(left.clone()).len();
let right_schema_len = optimizer.get_schema_of(right.clone()).len();

let right_cols_proj =
(0..right_schema_len).map(|x| ColumnRefPred::new(x + left_schema_len).into_pred_node());
let right_cols_proj = proj_exprs.to_vec().into_iter().map(|x| {
x.rewrite_column_refs(|col| Some(col + left_schema_len))
.unwrap()
});

let left_cols_proj = (0..left_schema_len).map(|x| ColumnRefPred::new(x).into_pred_node());
let new_proj_exprs = ListPred::new(
Expand Down Expand Up @@ -281,7 +290,7 @@ define_rule!(
/// talk by Mark Raasveldt. The correlated columns are covered in the original paper.
///
/// TODO: the outer join is not implemented yet, so some edge cases won't work.
/// Run SQList tests to catch these, I guess.
/// Run SQLite tests to catch these, I guess.
fn apply_dep_join_past_agg(
_optimizer: &impl Optimizer<DfNodeType>,
binding: ArcDfPlanNode,
Expand Down Expand Up @@ -310,15 +319,14 @@ fn apply_dep_join_past_agg(
})
.collect::<Vec<_>>();

// We need to group by all correlated columns.
// In our initial distinct step, we installed an agg node that groups by all correlated columns.
// Keeping this in mind, we only need to append a sequential number for each correlated column,
// as these will correspond to the outputs of the agg node.
let new_groups = ListPred::new(
groups
.to_vec()
.into_iter()
.map(|x| {
x.rewrite_column_refs(|col| Some(col + correlated_col_indices.len()))
.unwrap()
})
.chain(correlated_col_indices.iter().map(|x| {
(0..correlated_col_indices.len())
.map(|x| ColumnRefPred::new(x).into_pred_node())
.chain(groups.to_vec().into_iter().map(|x| {
x.rewrite_column_refs(|col| Some(col + correlated_col_indices.len()))
.unwrap()
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ where
l_partkey = p_partkey
);
----
863.2285714285714285714285714
863.2285714285715
20 changes: 20 additions & 0 deletions optd-sqllogictest/slt/unnest-extern-out-of-order.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
include _tpch_tables.slt.part

# A query with a correlated subquery that retrieves columns out of order
# i.e. the extern columns are not of the format [#0, #1, ...]
# This query has extern columns [#1]
query
select
l_orderkey,
l_partkey,
l_extendedprice,
(
select avg(p_size)
from part
where p_partkey = l_partkey
) as avg_extendedprice
from lineitem
where l_extendedprice > 55000;
----
1121 200 55010.00 22.0
4931 200 55010.00 22.0
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalAgg
├── exprs:Agg(Sum)
│ └── [ Cast { cast_to: Int64, child: #2 } ]
├── groups: [ #1 ]
├── groups: [ #0 ]
└── LogicalFilter
├── cond:Eq
│ ├── #1
Expand All @@ -64,7 +64,7 @@ PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=18005,io=3000}, stat: {ro
│ └── PhysicalAgg
│ ├── aggrs:Agg(Sum)
│ │ └── [ Cast { cast_to: Int64, child: #2 } ]
│ ├── groups: [ #1 ]
│ ├── groups: [ #0 ]
│ ├── cost: {compute=14000,io=2000}
│ ├── stat: {row_cnt=1000}
│ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=6000,io=2000}, stat: {row_cnt=1000} }
Expand Down Expand Up @@ -118,7 +118,7 @@ LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalAgg
├── exprs:Agg(Sum)
│ └── [ Cast { cast_to: Int64, child: #2 } ]
├── groups: [ #1 ]
├── groups: [ #0 ]
└── LogicalProjection { exprs: [ #0, #1, #2, #3, #4 ] }
└── LogicalFilter
├── cond:And
Expand All @@ -145,7 +145,7 @@ PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=21005,io=4000}, stat: {ro
│ └── PhysicalAgg
│ ├── aggrs:Agg(Sum)
│ │ └── [ Cast { cast_to: Int64, child: #2 } ]
│ ├── groups: [ #1 ]
│ ├── groups: [ #0 ]
│ ├── cost: {compute=17000,io=3000}
│ ├── stat: {row_cnt=1000}
│ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #2 ], right_keys: [ #0 ], cost: {compute=9000,io=3000}, stat: {row_cnt=1000} }
Expand Down
2 changes: 1 addition & 1 deletion optd-sqlplannertest/tests/tpch/tpch-01-05.planner.sql
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ PhysicalLimit { skip: 0(u64), fetch: 100(u64) }
└── PhysicalAgg
├── aggrs:Agg(Min)
│ └── [ #4 ]
├── groups: [ #1 ]
├── groups: [ #0 ]
└── PhysicalFilter
├── cond:And
│ ├── Eq
Expand Down

0 comments on commit 93aa6be

Please sign in to comment.