diff --git a/optd-datafusion-repr/src/rules/subquery/depjoin_pushdown.rs b/optd-datafusion-repr/src/rules/subquery/depjoin_pushdown.rs index 6e60f122..1c087554 100644 --- a/optd-datafusion-repr/src/rules/subquery/depjoin_pushdown.rs +++ b/optd-datafusion-repr/src/rules/subquery/depjoin_pushdown.rs @@ -116,14 +116,21 @@ fn apply_dep_initial_distinct( // Our join condition is going to make sure that all of the correlated columns // in the right side are equal to their equivalent columns in the left side. - // (they will have the same index, just shifted over) + // + // If we have correlated columns [#16, #17], we want our condition to be: + // #16 = #0 AND #17 = #1 + // + // This is because the aggregate we install on the right side will map the + // correlated columns to their respective indices as shown. let join_cond = LogOpPred::new( LogOpType::And, - (0..correlated_col_indices.len()) - .map(|i| { + correlated_col_indices + .iter() + .enumerate() + .map(|(i, x)| { assert!(i + left_schema_size < left_schema_size + right_schema_size); BinOpPred::new( - ColumnRefPred::new(i).into_pred_node(), + ColumnRefPred::new(*x).into_pred_node(), ColumnRefPred::new(i + left_schema_size).into_pred_node(), BinOpType::Eq, ) @@ -177,16 +184,18 @@ fn apply_dep_join_past_proj( let cond = join.cond(); let extern_cols = join.extern_cols(); let proj = LogicalProjection::from_plan_node(right.unwrap_plan_node()).unwrap(); + let proj_exprs = proj.exprs(); let right = proj.child(); // TODO: can we have external columns in projection node? I don't think so? // Cross join should always have true cond assert!(cond == ConstantPred::bool(true).into_pred_node()); let left_schema_len = optimizer.get_schema_of(left.clone()).len(); - let right_schema_len = optimizer.get_schema_of(right.clone()).len(); - let right_cols_proj = - (0..right_schema_len).map(|x| ColumnRefPred::new(x + left_schema_len).into_pred_node()); + let right_cols_proj = proj_exprs.to_vec().into_iter().map(|x| { + x.rewrite_column_refs(|col| Some(col + left_schema_len)) + .unwrap() + }); let left_cols_proj = (0..left_schema_len).map(|x| ColumnRefPred::new(x).into_pred_node()); let new_proj_exprs = ListPred::new( @@ -281,7 +290,7 @@ define_rule!( /// talk by Mark Raasveldt. The correlated columns are covered in the original paper. /// /// TODO: the outer join is not implemented yet, so some edge cases won't work. -/// Run SQList tests to catch these, I guess. +/// Run SQLite tests to catch these, I guess. fn apply_dep_join_past_agg( _optimizer: &impl Optimizer, binding: ArcDfPlanNode, @@ -310,15 +319,14 @@ fn apply_dep_join_past_agg( }) .collect::>(); + // We need to group by all correlated columns. + // In our initial distinct step, we installed an agg node that groups by all correlated columns. + // Keeping this in mind, we only need to append a sequential number for each correlated column, + // as these will correspond to the outputs of the agg node. let new_groups = ListPred::new( - groups - .to_vec() - .into_iter() - .map(|x| { - x.rewrite_column_refs(|col| Some(col + correlated_col_indices.len())) - .unwrap() - }) - .chain(correlated_col_indices.iter().map(|x| { + (0..correlated_col_indices.len()) + .map(|x| ColumnRefPred::new(x).into_pred_node()) + .chain(groups.to_vec().into_iter().map(|x| { x.rewrite_column_refs(|col| Some(col + correlated_col_indices.len())) .unwrap() })) diff --git a/optd-sqllogictest/slt/tpch-q17.slt.disabled b/optd-sqllogictest/slt/tpch-q17.slt similarity index 92% rename from optd-sqllogictest/slt/tpch-q17.slt.disabled rename to optd-sqllogictest/slt/tpch-q17.slt index a08de667..d6e658d0 100644 --- a/optd-sqllogictest/slt/tpch-q17.slt.disabled +++ b/optd-sqllogictest/slt/tpch-q17.slt @@ -19,4 +19,4 @@ where l_partkey = p_partkey ); ---- -863.2285714285714285714285714 +863.2285714285715 diff --git a/optd-sqllogictest/slt/unnest-extern-out-of-order.slt b/optd-sqllogictest/slt/unnest-extern-out-of-order.slt new file mode 100644 index 00000000..146d72e3 --- /dev/null +++ b/optd-sqllogictest/slt/unnest-extern-out-of-order.slt @@ -0,0 +1,20 @@ +include _tpch_tables.slt.part + +# A query with a correlated subquery that retrieves columns out of order +# i.e. the extern columns are not of the format [#0, #1, ...] +# This query has extern columns [#1] +query +select + l_orderkey, + l_partkey, + l_extendedprice, + ( + select avg(p_size) + from part + where p_partkey = l_partkey + ) as avg_extendedprice +from lineitem +where l_extendedprice > 55000; +---- +1121 200 55010.00 22.0 +4931 200 55010.00 22.0 diff --git a/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql b/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql index b6b0bbec..c14291a1 100644 --- a/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql +++ b/optd-sqlplannertest/tests/subqueries/subquery_unnesting.planner.sql @@ -44,7 +44,7 @@ LogicalProjection { exprs: [ #0, #1 ] } └── LogicalAgg ├── exprs:Agg(Sum) │ └── [ Cast { cast_to: Int64, child: #2 } ] - ├── groups: [ #1 ] + ├── groups: [ #0 ] └── LogicalFilter ├── cond:Eq │ ├── #1 @@ -64,7 +64,7 @@ PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=18005,io=3000}, stat: {ro │ └── PhysicalAgg │ ├── aggrs:Agg(Sum) │ │ └── [ Cast { cast_to: Int64, child: #2 } ] - │ ├── groups: [ #1 ] + │ ├── groups: [ #0 ] │ ├── cost: {compute=14000,io=2000} │ ├── stat: {row_cnt=1000} │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=6000,io=2000}, stat: {row_cnt=1000} } @@ -118,7 +118,7 @@ LogicalProjection { exprs: [ #0, #1 ] } └── LogicalAgg ├── exprs:Agg(Sum) │ └── [ Cast { cast_to: Int64, child: #2 } ] - ├── groups: [ #1 ] + ├── groups: [ #0 ] └── LogicalProjection { exprs: [ #0, #1, #2, #3, #4 ] } └── LogicalFilter ├── cond:And @@ -145,7 +145,7 @@ PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=21005,io=4000}, stat: {ro │ └── PhysicalAgg │ ├── aggrs:Agg(Sum) │ │ └── [ Cast { cast_to: Int64, child: #2 } ] - │ ├── groups: [ #1 ] + │ ├── groups: [ #0 ] │ ├── cost: {compute=17000,io=3000} │ ├── stat: {row_cnt=1000} │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #2 ], right_keys: [ #0 ], cost: {compute=9000,io=3000}, stat: {row_cnt=1000} } diff --git a/optd-sqlplannertest/tests/tpch/tpch-01-05.planner.sql b/optd-sqlplannertest/tests/tpch/tpch-01-05.planner.sql index c5edf635..0ee8d3c0 100644 --- a/optd-sqlplannertest/tests/tpch/tpch-01-05.planner.sql +++ b/optd-sqlplannertest/tests/tpch/tpch-01-05.planner.sql @@ -358,7 +358,7 @@ PhysicalLimit { skip: 0(u64), fetch: 100(u64) } └── PhysicalAgg ├── aggrs:Agg(Min) │ └── [ #4 ] - ├── groups: [ #1 ] + ├── groups: [ #0 ] └── PhysicalFilter ├── cond:And │ ├── Eq