Skip to content

Commit

Permalink
fix(optimizer): can't apply pull_up_correlated_predicate_agg_rule wit…
Browse files Browse the repository at this point in the history
…h non-null-propagating expr (#20012)

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Jan 3, 2025
1 parent e08b0f9 commit 6248415
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 37 deletions.
60 changes: 60 additions & 0 deletions e2e_test/batch/aggregate/issue_19835.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
CREATE TABLE foo (
id int
);

statement ok
CREATE TABLE bar (
id int,
foo_id int
);

statement ok
insert into foo values (1),(2);

query I rowsort
select
foo.id, array(
select
1
from bar
where bar.foo_id = foo.id
)
from foo;
----
1 {}
2 {}


query I rowsort
select foo.id, (select count(id) from bar where bar.foo_id = foo.id) from foo;
----
1 0
2 0

query I rowsort
select foo.id, (select avg(id) from bar where bar.foo_id = foo.id) from foo;
----
1 NULL
2 NULL

query I rowsort
select foo.id, (select max(id) from bar where bar.foo_id = foo.id) from foo;
----
1 NULL
2 NULL

query I rowsort
select foo.id, (select coalesce( max(id), 114514) from bar where bar.foo_id = foo.id) from foo;
----
1 114514
2 114514

statement ok
drop table foo;

statement ok
drop table bar;
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@
CREATE TABLE T (A INT, B INT);
CREATE TABLE T2 (C INT, D INT);
SELECT * FROM T
-- count is not null-propagating
WHERE T.A > (SELECT COUNT(*) FROM T2 WHERE B = D);
expected_outputs:
- batch_plan
Expand All @@ -504,10 +505,31 @@
CREATE TABLE T (A INT, B INT);
CREATE TABLE T2 (C INT, D INT);
SELECT * FROM T
-- avg is null-propagating
WHERE T.A > (SELECT avg(c) FROM T2 WHERE B = D);
expected_outputs:
- batch_plan
- stream_plan
- name: a case can't be optimized by PullUpCorrelatedPredicateAggRule
sql: |
CREATE TABLE T (A INT, B INT);
CREATE TABLE T2 (C INT, D INT);
SELECT * FROM T
-- Coalesce is not null-propagating
WHERE T.A > (SELECT coalesce(avg(c), 114514) FROM T2 WHERE B = D);
expected_outputs:
- batch_plan
- stream_plan
- name: a case can't be optimized by PullUpCorrelatedPredicateAggRule
sql: |
CREATE TABLE T (A INT, B INT);
CREATE TABLE T2 (C INT, D INT);
SELECT * FROM T
-- Can apply if any of the conjunction is null
WHERE null AND T.A > (SELECT coalesce(avg(c), 114514) FROM T2 WHERE B = D);
expected_outputs:
- batch_plan
- stream_plan
- name: improve multi scalar subqueries optimization time. issue 16952. case 1.
sql: |
create table t1(a int, b int);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,23 +992,36 @@
select Array(select c from t2 where b = d) arr from t1;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: LeftOuter, predicate: t1.b = t2.d, output: [$expr1] }
└─BatchHashJoin { type: LeftOuter, predicate: t1.b IS NOT DISTINCT FROM t1.b, output: [$expr1] }
├─BatchExchange { order: [], dist: HashShard(t1.b) }
│ └─BatchScan { table: t1, columns: [t1.b], distribution: SomeShard }
└─BatchProject { exprs: [Coalesce(array_agg(t2.c), ARRAY[]:List(Int32)) as $expr1, t2.d] }
└─BatchHashAgg { group_key: [t2.d], aggs: [array_agg(t2.c)] }
└─BatchExchange { order: [], dist: HashShard(t2.d) }
└─BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard }
└─BatchProject { exprs: [t1.b, Coalesce(array_agg(t2.c) filter(IsNotNull(1:Int32)), ARRAY[]:List(Int32)) as $expr1] }
└─BatchHashAgg { group_key: [t1.b], aggs: [array_agg(t2.c) filter(IsNotNull(1:Int32))] }
└─BatchHashJoin { type: LeftOuter, predicate: t1.b IS NOT DISTINCT FROM t2.d, output: [t1.b, t2.c, 1:Int32] }
├─BatchHashAgg { group_key: [t1.b], aggs: [] }
│ └─BatchExchange { order: [], dist: HashShard(t1.b) }
│ └─BatchScan { table: t1, columns: [t1.b], distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t2.d) }
└─BatchProject { exprs: [t2.d, t2.c, 1:Int32] }
└─BatchFilter { predicate: IsNotNull(t2.d) }
└─BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [arr, t1._row_id(hidden), t1.b(hidden), t2.d(hidden)], stream_key: [t1._row_id, t1.b], pk_columns: [t1._row_id, t1.b], pk_conflict: NoCheck }
StreamMaterialize { columns: [arr, t1._row_id(hidden), t1.b(hidden), t1.b#1(hidden)], stream_key: [t1._row_id, t1.b], pk_columns: [t1._row_id, t1.b], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(t1._row_id, t1.b) }
└─StreamHashJoin { type: LeftOuter, predicate: t1.b = t2.d, output: [$expr1, t1._row_id, t1.b, t2.d] }
└─StreamHashJoin { type: LeftOuter, predicate: t1.b IS NOT DISTINCT FROM t1.b, output: [$expr1, t1._row_id, t1.b, t1.b] }
├─StreamExchange { dist: HashShard(t1.b) }
│ └─StreamTableScan { table: t1, columns: [t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamProject { exprs: [Coalesce(array_agg(t2.c), ARRAY[]:List(Int32)) as $expr1, t2.d] }
└─StreamHashAgg { group_key: [t2.d], aggs: [array_agg(t2.c), count] }
└─StreamExchange { dist: HashShard(t2.d) }
└─StreamTableScan { table: t2, columns: [t2.c, t2.d, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
└─StreamProject { exprs: [t1.b, Coalesce(array_agg(t2.c) filter(IsNotNull(1:Int32)), ARRAY[]:List(Int32)) as $expr1] }
└─StreamHashAgg { group_key: [t1.b], aggs: [array_agg(t2.c) filter(IsNotNull(1:Int32)), count] }
└─StreamHashJoin { type: LeftOuter, predicate: t1.b IS NOT DISTINCT FROM t2.d, output: [t1.b, t2.c, 1:Int32, t2._row_id] }
├─StreamProject { exprs: [t1.b], noop_update_hint: true }
│ └─StreamHashAgg { group_key: [t1.b], aggs: [count] }
│ └─StreamExchange { dist: HashShard(t1.b) }
│ └─StreamTableScan { table: t1, columns: [t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.d) }
└─StreamProject { exprs: [t2.d, t2.c, 1:Int32, t2._row_id] }
└─StreamFilter { predicate: IsNotNull(t2.d) }
└─StreamTableScan { table: t2, columns: [t2.c, t2.d, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
- name: correlated array subquery \du
sql: |
SELECT r.rolname, r.rolsuper, r.rolinherit,
Expand All @@ -1027,18 +1040,32 @@
BatchExchange { order: [rw_users.name ASC], dist: Single }
└─BatchProject { exprs: [rw_users.name, rw_users.is_super, true:Boolean, rw_users.create_user, rw_users.create_db, rw_users.can_login, -1:Int32, null:Timestamptz, $expr1, true:Boolean, true:Boolean] }
└─BatchSort { order: [rw_users.name ASC] }
└─BatchHashJoin { type: LeftOuter, predicate: rw_users.id = null:Int32, output: all }
└─BatchHashJoin { type: LeftOuter, predicate: rw_users.id IS NOT DISTINCT FROM rw_users.id, output: all }
├─BatchExchange { order: [], dist: HashShard(rw_users.id) }
│ └─BatchFilter { predicate: Not(RegexpEq(rw_users.name, '^pg_':Varchar)) }
│ └─BatchScan { table: rw_users, columns: [rw_users.id, rw_users.name, rw_users.is_super, rw_users.create_db, rw_users.create_user, rw_users.can_login], distribution: Single }
└─BatchProject { exprs: [Coalesce(array_agg(rw_users.name), ARRAY[]:List(Varchar)) as $expr1, null:Int32] }
└─BatchHashAgg { group_key: [null:Int32], aggs: [array_agg(rw_users.name)] }
└─BatchExchange { order: [], dist: HashShard(null:Int32) }
└─BatchHashJoin { type: Inner, predicate: null:Int32 = rw_users.id, output: [rw_users.name, null:Int32] }
├─BatchExchange { order: [], dist: HashShard(null:Int32) }
│ └─BatchValues { rows: [] }
└─BatchExchange { order: [], dist: HashShard(rw_users.id) }
└─BatchScan { table: rw_users, columns: [rw_users.id, rw_users.name], distribution: Single }
└─BatchProject { exprs: [rw_users.id, Coalesce(array_agg(rw_users.name) filter(IsNotNull(1:Int32)), ARRAY[]:List(Varchar)) as $expr1] }
└─BatchHashAgg { group_key: [rw_users.id], aggs: [array_agg(rw_users.name) filter(IsNotNull(1:Int32))] }
└─BatchHashJoin { type: LeftOuter, predicate: rw_users.id IS NOT DISTINCT FROM rw_users.id, output: [rw_users.id, rw_users.name, 1:Int32] }
├─BatchHashAgg { group_key: [rw_users.id], aggs: [] }
│ └─BatchExchange { order: [], dist: HashShard(rw_users.id) }
│ └─BatchProject { exprs: [rw_users.id] }
│ └─BatchFilter { predicate: Not(RegexpEq(rw_users.name, '^pg_':Varchar)) }
│ └─BatchScan { table: rw_users, columns: [rw_users.id, rw_users.name], distribution: Single }
└─BatchExchange { order: [], dist: HashShard(rw_users.id) }
└─BatchProject { exprs: [rw_users.id, rw_users.name, 1:Int32] }
└─BatchHashJoin { type: Inner, predicate: null:Int32 = rw_users.id, output: [rw_users.id, rw_users.name] }
├─BatchExchange { order: [], dist: HashShard(null:Int32) }
│ └─BatchProject { exprs: [rw_users.id, null:Int32] }
│ └─BatchNestedLoopJoin { type: Inner, predicate: true, output: all }
│ ├─BatchExchange { order: [], dist: Single }
│ │ └─BatchHashAgg { group_key: [rw_users.id], aggs: [] }
│ │ └─BatchExchange { order: [], dist: HashShard(rw_users.id) }
│ │ └─BatchValues { rows: [] }
│ └─BatchFilter { predicate: false:Boolean }
│ └─BatchValues { rows: [] }
└─BatchExchange { order: [], dist: HashShard(rw_users.id) }
└─BatchScan { table: rw_users, columns: [rw_users.id, rw_users.name], distribution: Single }
- name: correlated array subquery (issue 14423)
sql: |
CREATE TABLE array_types ( x BIGINT[] );
Expand Down Expand Up @@ -1069,6 +1096,7 @@
CREATE TABLE T (A INT, B INT);
CREATE TABLE T2 (C INT, D INT);
SELECT * FROM T
-- count is not null-propagating
WHERE T.A > (SELECT COUNT(*) FROM T2 WHERE B = D);
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand Down Expand Up @@ -1109,6 +1137,7 @@
CREATE TABLE T (A INT, B INT);
CREATE TABLE T2 (C INT, D INT);
SELECT * FROM T
-- avg is null-propagating
WHERE T.A > (SELECT avg(c) FROM T2 WHERE B = D);
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -1132,6 +1161,81 @@
└─StreamHashAgg { group_key: [t2.d], aggs: [sum(t2.c), count(t2.c), count] }
└─StreamExchange { dist: HashShard(t2.d) }
└─StreamTableScan { table: t2, columns: [t2.c, t2.d, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
- name: a case can't be optimized by PullUpCorrelatedPredicateAggRule
sql: |
CREATE TABLE T (A INT, B INT);
CREATE TABLE T2 (C INT, D INT);
SELECT * FROM T
-- Coalesce is not null-propagating
WHERE T.A > (SELECT coalesce(avg(c), 114514) FROM T2 WHERE B = D);
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: Inner, predicate: t.b IS NOT DISTINCT FROM t.b AND ($expr1 > $expr2), output: [t.a, t.b] }
├─BatchExchange { order: [], dist: HashShard(t.b) }
│ └─BatchProject { exprs: [t.a, t.b, t.a::Decimal as $expr1] }
│ └─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard }
└─BatchProject { exprs: [t.b, Coalesce((sum(t2.c)::Decimal / count(t2.c)::Decimal), 114514:Decimal) as $expr2] }
└─BatchHashAgg { group_key: [t.b], aggs: [sum(t2.c), count(t2.c)] }
└─BatchHashJoin { type: LeftOuter, predicate: t.b IS NOT DISTINCT FROM t2.d, output: [t.b, t2.c] }
├─BatchHashAgg { group_key: [t.b], aggs: [] }
│ └─BatchExchange { order: [], dist: HashShard(t.b) }
│ └─BatchScan { table: t, columns: [t.b], distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t2.d) }
└─BatchProject { exprs: [t2.d, t2.c] }
└─BatchFilter { predicate: IsNotNull(t2.d) }
└─BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [a, b, t._row_id(hidden), t.b(hidden)], stream_key: [t._row_id, b], pk_columns: [t._row_id, b], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.a, t.b, t._row_id, t.b] }
└─StreamFilter { predicate: ($expr1 > $expr2) }
└─StreamHashJoin { type: Inner, predicate: t.b IS NOT DISTINCT FROM t.b, output: all }
├─StreamExchange { dist: HashShard(t.b) }
│ └─StreamProject { exprs: [t.a, t.b, t.a::Decimal as $expr1, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProject { exprs: [t.b, Coalesce((sum(t2.c)::Decimal / count(t2.c)::Decimal), 114514:Decimal) as $expr2] }
└─StreamHashAgg { group_key: [t.b], aggs: [sum(t2.c), count(t2.c), count] }
└─StreamHashJoin { type: LeftOuter, predicate: t.b IS NOT DISTINCT FROM t2.d, output: [t.b, t2.c, t2._row_id] }
├─StreamProject { exprs: [t.b], noop_update_hint: true }
│ └─StreamHashAgg { group_key: [t.b], aggs: [count] }
│ └─StreamExchange { dist: HashShard(t.b) }
│ └─StreamTableScan { table: t, columns: [t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(t2.d) }
└─StreamProject { exprs: [t2.d, t2.c, t2._row_id] }
└─StreamFilter { predicate: IsNotNull(t2.d) }
└─StreamTableScan { table: t2, columns: [t2.c, t2.d, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
- name: a case can't be optimized by PullUpCorrelatedPredicateAggRule
sql: |
CREATE TABLE T (A INT, B INT);
CREATE TABLE T2 (C INT, D INT);
SELECT * FROM T
-- Can apply if any of the conjunction is null
WHERE null AND T.A > (SELECT coalesce(avg(c), 114514) FROM T2 WHERE B = D);
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: Inner, predicate: t.b = t2.d AND ($expr1 > $expr2), output: [t.a, t.b] }
├─BatchExchange { order: [], dist: HashShard(t.b) }
│ └─BatchProject { exprs: [t.a, t.b, t.a::Decimal as $expr1] }
│ └─BatchFilter { predicate: null:Boolean }
│ └─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard }
└─BatchProject { exprs: [Coalesce((sum(t2.c)::Decimal / count(t2.c)::Decimal), 114514:Decimal) as $expr2, t2.d] }
└─BatchHashAgg { group_key: [t2.d], aggs: [sum(t2.c), count(t2.c)] }
└─BatchExchange { order: [], dist: HashShard(t2.d) }
└─BatchFilter { predicate: null:Boolean }
└─BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [a, b, t._row_id(hidden), t2.d(hidden)], stream_key: [t._row_id, b], pk_columns: [t._row_id, b], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.a, t.b, t._row_id, t2.d] }
└─StreamFilter { predicate: ($expr1 > $expr2) }
└─StreamHashJoin { type: Inner, predicate: t.b = t2.d, output: all }
├─StreamExchange { dist: HashShard(t.b) }
│ └─StreamProject { exprs: [t.a, t.b, t.a::Decimal as $expr1, t._row_id] }
│ └─StreamFilter { predicate: null:Boolean }
│ └─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProject { exprs: [Coalesce((sum(t2.c)::Decimal / count(t2.c)::Decimal), 114514:Decimal) as $expr2, t2.d] }
└─StreamHashAgg { group_key: [t2.d], aggs: [sum(t2.c), count(t2.c), count] }
└─StreamExchange { dist: HashShard(t2.d) }
└─StreamFilter { predicate: null:Boolean }
└─StreamTableScan { table: t2, columns: [t2.c, t2.d, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
- name: improve multi scalar subqueries optimization time. issue 16952. case 1.
sql: |
create table t1(a int, b int);
Expand Down
Loading

0 comments on commit 6248415

Please sign in to comment.