Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): add StreamProjectMaterialize PlanNode #8843

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ message ProjectNode {
repeated uint32 watermark_output_key = 3;
}

message ProjectMaterializedNode {
repeated expr.ExprNode select_list = 1;
// this two field is expressing a list of usize pair, which means when project receives a
// watermark with `watermark_input_key[i]` column index, it should derive a new watermark
// with `watermark_output_key[i]`th expression
repeated uint32 watermark_input_key = 2;
repeated uint32 watermark_output_key = 3;
repeated uint32 materialize_columns_key = 4;
catalog.Table table = 5;
}

message FilterNode {
expr.ExprNode search_condition = 1;
}
Expand Down Expand Up @@ -565,6 +576,7 @@ message StreamNode {
GroupTopNNode append_only_group_top_n = 130;
TemporalJoinNode temporal_join = 131;
BarrierRecvNode barrier_recv = 132;
ProjectMaterializedNode project_materialized = 133;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
19 changes: 2 additions & 17 deletions src/frontend/src/optimizer/plan_node/logical_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,23 +270,8 @@ impl ToStream for LogicalProject {
let new_input = self
.input()
.to_stream_with_dist_required(&input_required, ctx)?;
let new_logical = self.clone_with_input(new_input.clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for review: we already do project merge rule

let stream_plan = if let Some(input_proj) = new_input.as_stream_project() {
let outer_project = new_logical;
let inner_project = input_proj.as_logical();
let mut subst = Substitute {
mapping: inner_project.exprs().clone(),
};
let exprs = outer_project
.exprs()
.iter()
.cloned()
.map(|expr| subst.rewrite_expr(expr))
.collect();
StreamProject::new(LogicalProject::new(inner_project.input(), exprs))
} else {
StreamProject::new(new_logical)
};
let new_logical = self.clone_with_input(new_input);
let stream_plan = StreamProject::new(new_logical);
required_dist.enforce_if_not_satisfies(stream_plan.into(), &Order::any())
}

Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ mod stream_local_simple_agg;
mod stream_materialize;
mod stream_now;
mod stream_project;
mod stream_project_materialized;
mod stream_project_set;
mod stream_row_id_gen;
mod stream_sink;
Expand Down Expand Up @@ -736,6 +737,7 @@ pub use stream_local_simple_agg::StreamLocalSimpleAgg;
pub use stream_materialize::StreamMaterialize;
pub use stream_now::StreamNow;
pub use stream_project::StreamProject;
pub use stream_project_materialized::StreamProjectMaterialized;
pub use stream_project_set::StreamProjectSet;
pub use stream_row_id_gen::StreamRowIdGen;
pub use stream_share::StreamShare;
Expand Down Expand Up @@ -833,6 +835,7 @@ macro_rules! for_all_plan_nodes {
, { Stream, Expand }
, { Stream, DynamicFilter }
, { Stream, ProjectSet }
, { Stream, ProjectMaterialized }
, { Stream, GroupTopN }
, { Stream, Union }
, { Stream, RowIdGen }
Expand Down Expand Up @@ -933,6 +936,7 @@ macro_rules! for_stream_plan_nodes {
, { Stream, Expand }
, { Stream, DynamicFilter }
, { Stream, ProjectSet }
, { Stream, ProjectMaterialized }
, { Stream, GroupTopN }
, { Stream, Union }
, { Stream, RowIdGen }
Expand Down
213 changes: 213 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_project_materialized.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{FieldDisplay, Schema};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::ProjectMaterializedNode;

use super::generic::GenericPlanRef;
use super::stream::StreamPlanRef;
use super::utils::TableCatalogBuilder;
use super::{ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{try_derive_watermark, Expr, ExprImpl, ExprRewriter};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;

/// `StreamProjectMaterialized` implements [`super::LogicalProject`] to evaluate specified
/// expressions on input rows. and additionally, materializes the `materialize_columns`
/// which is a concatenation of the node's PK with the non-pk `forced_materialize_columns`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamProjectMaterialized {
pub base: PlanBase,
logical: LogicalProject,
/// All the watermark derivations, (input_column_index, output_column_index). And the
/// derivation expression is the project's expression itself.
watermark_derivations: Vec<(usize, usize)>,
/// Materialize columns will contain all the
materialize_columns: Vec<usize>,
}

impl fmt::Display for StreamProjectMaterialized {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut builder = f.debug_struct("StreamProjectMaterialized");
self.logical.fmt_fields_with_builder(&mut builder);
if !self.watermark_derivations.is_empty() {
builder.field(
"output_watermarks",
&self
.watermark_derivations
.iter()
.map(|(_, idx)| FieldDisplay(self.schema().fields.get(*idx).unwrap()))
.collect_vec(),
);
};
builder.finish()
}
}

impl StreamProjectMaterialized {
pub fn new(logical: LogicalProject, forced_materialize_columns: Vec<usize>) -> Self {
let ctx = logical.base.ctx.clone();
let input = logical.input();
let pk_indices = logical.base.logical_pk.to_vec();
let schema = logical.schema().clone();

let distribution = logical
.i2o_col_mapping()
.rewrite_provided_distribution(input.distribution());

let mut watermark_derivations = vec![];
let mut watermark_columns = FixedBitSet::with_capacity(schema.len());
for (expr_idx, expr) in logical.exprs().iter().enumerate() {
if let Some(input_idx) = try_derive_watermark(expr) {
if input.watermark_columns().contains(input_idx) {
watermark_derivations.push((input_idx, expr_idx));
watermark_columns.insert(expr_idx);
}
}
}
let mut materialize_columns = pk_indices.clone();
for idx in forced_materialize_columns {
if !materialize_columns.contains(&idx) {
materialize_columns.push(idx);
}
}
// Project executor won't change the append-only behavior of the stream, so it depends on
// input's `append_only`.
let base = PlanBase::new_stream(
ctx,
schema,
pk_indices,
logical.functional_dependency().clone(),
distribution,
logical.input().append_only(),
watermark_columns,
);
StreamProjectMaterialized {
base,
logical,
watermark_derivations,
materialize_columns,
}
}

pub fn as_logical(&self) -> &LogicalProject {
&self.logical
}

pub fn exprs(&self) -> &Vec<ExprImpl> {
self.logical.exprs()
}
}

impl PlanTreeNodeUnary for StreamProjectMaterialized {
fn input(&self) -> PlanRef {
self.logical.input()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(
self.logical.clone_with_input(input),
self.materialize_columns.clone(),
)
}
}
impl_plan_tree_node_for_unary! {StreamProjectMaterialized}

impl StreamNode for StreamProjectMaterialized {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
// This is an artificial schema: [logical_pk, non_pk_materialize_columns]
let schema = Schema {
fields: self
.materialize_columns
.iter()
.map(|i| self.schema().fields[*i].clone())
.collect(),
};
let mut index = vec![None; self.schema().len()];
self.logical
.logical_pk()
.iter()
.enumerate()
.for_each(|(idx, pk)| {
index[*pk] = Some(idx);
});
let col_to_schema_mapping = ColIndexMapping::new(index);
let mut internal_table_catalog_builder =
TableCatalogBuilder::new(self.base.ctx().with_options().internal_table_subset());
schema.fields().iter().for_each(|field| {
internal_table_catalog_builder.add_column(field);
});
(0..self.logical.logical_pk().len()).for_each(|i| {
internal_table_catalog_builder.add_order_column(i, OrderType::ascending())
});
let dist_keys = self
.base
.distribution()
.dist_column_indices()
.iter()
// Distribution keys must be contained in logical_pk
.map(|i| col_to_schema_mapping.map(*i))
.collect();

let table_catalog = internal_table_catalog_builder
.build(dist_keys, 0)
.with_id(state.gen_table_id_wrapped());

PbNodeBody::ProjectMaterialized(ProjectMaterializedNode {
select_list: self
.logical
.exprs()
.iter()
.map(|x| x.to_expr_proto())
.collect(),
watermark_input_key: self
.watermark_derivations
.iter()
.map(|(x, _)| *x as u32)
.collect(),
watermark_output_key: self
.watermark_derivations
.iter()
.map(|(_, y)| *y as u32)
.collect(),
materialize_columns_key: self.materialize_columns.iter().map(|y| *y as u32).collect(),
table: Some(table_catalog.to_internal_table_prost()),
})
}
}

impl ExprRewritable for StreamProjectMaterialized {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_project()
.unwrap()
.clone(),
self.materialize_columns.clone(),
)
.into()
}
}
3 changes: 2 additions & 1 deletion src/stream/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ macro_rules! build_executor {
<$data_type>::new_boxed_executor($source, node, $store, $stream).await
},
)*
NodeBody::Exchange(_) | NodeBody::DeltaIndexJoin(_) => unreachable!()
NodeBody::Exchange(_) | NodeBody::DeltaIndexJoin(_) => unreachable!(),
NodeBody::ProjectMaterialized(_) => unimplemented!(),
}
}
}
Expand Down