Skip to content

Commit

Permalink
feat(flow): plan def (#3490)
Browse files Browse the repository at this point in the history
* feat: plan def

* chore: add license

* docs: remove TODO done

* chore: add derive Ord
  • Loading branch information
discord9 authored Mar 12, 2024
1 parent 9aa8f75 commit 58bd065
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/flow/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ mod scalar;
pub(crate) use error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu};
pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
pub(crate) use id::{GlobalId, Id, LocalId};
pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan};
pub(crate) use relation::{AggregateExpr, AggregateFunc};
pub(crate) use scalar::ScalarExpr;
4 changes: 2 additions & 2 deletions src/flow/src/expr/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::repr::{self, value_to_internal_ts, Diff, Row};
/// expressions in `self.expressions`, even though this is not something
/// we can directly evaluate. The plan creation methods will defensively
/// ensure that the right thing happens.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct MapFilterProject {
/// A sequence of expressions that should be appended to the row.
///
Expand Down Expand Up @@ -415,7 +415,7 @@ impl MapFilterProject {
}

/// A wrapper type which indicates it is safe to simply evaluate all expressions.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct SafeMfpPlan {
pub(crate) mfp: MapFilterProject,
}
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/expr/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod accum;
mod func;

/// Describes an aggregation expression.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct AggregateExpr {
/// Names the aggregation function.
pub func: AggregateFunc,
Expand Down
1 change: 1 addition & 0 deletions src/flow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
// allow unused for now because it should be use later
mod adapter;
mod expr;
mod plan;
mod repr;
98 changes: 98 additions & 0 deletions src/flow/src/plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This module contain basic definition for dataflow's plan
//! that can be translate to hydro dataflow
mod join;
mod reduce;

use serde::{Deserialize, Serialize};

pub(crate) use self::reduce::{AccumulablePlan, KeyValPlan, ReducePlan};
use crate::expr::{
AggregateExpr, EvalError, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr,
};
use crate::plan::join::JoinPlan;
use crate::repr::{DiffRow, RelationType};

#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
pub struct TypedPlan {
/// output type of the relation
pub typ: RelationType,
pub plan: Plan,
}

/// TODO(discord9): support `TableFunc`(by define FlatMap that map 1 to n)
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
pub enum Plan {
/// A constant collection of rows.
Constant { rows: Vec<DiffRow> },
/// Get CDC data from an source, be it external reference to an existing source or an internal
/// reference to a `Let` identifier
Get { id: Id },
/// Create a temporary collection from given `value``, and make this bind only available
/// in scope of `body`
Let {
id: LocalId,
value: Box<Plan>,
body: Box<Plan>,
},
/// Map, Filter, and Project operators.
Mfp {
/// The input collection.
input: Box<Plan>,
/// Linear operator to apply to each record.
mfp: MapFilterProject,
},
/// Reduce operator, aggregation by key assembled from KeyValPlan
Reduce {
/// The input collection.
input: Box<Plan>,
/// A plan for changing input records into key, value pairs.
key_val_plan: KeyValPlan,
/// A plan for performing the reduce.
///
/// The implementation of reduction has several different strategies based
/// on the properties of the reduction, and the input itself.
reduce_plan: ReducePlan,
},
/// A multiway relational equijoin, with fused map, filter, and projection.
///
/// This stage performs a multiway join among `inputs`, using the equality
/// constraints expressed in `plan`. The plan also describes the implementation
/// strategy we will use, and any pushed down per-record work.
Join {
/// An ordered list of inputs that will be joined.
inputs: Vec<Plan>,
/// Detailed information about the implementation of the join.
///
/// This includes information about the implementation strategy, but also
/// any map, filter, project work that we might follow the join with, but
/// potentially pushed down into the implementation of the join.
plan: JoinPlan,
},
/// Adds the contents of the input collections.
///
/// Importantly, this is *multiset* union, so the multiplicities of records will
/// add. This is in contrast to *set* union, where the multiplicities would be
/// capped at one. A set union can be formed with `Union` followed by `Reduce`
/// implementing the "distinct" operator.
Union {
/// The input collections
inputs: Vec<Plan>,
/// Whether to consolidate the output, e.g., cancel negated records.
consolidate_output: bool,
},
}
78 changes: 78 additions & 0 deletions src/flow/src/plan/join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

use crate::expr::ScalarExpr;
use crate::plan::SafeMfpPlan;

/// TODO(discord9): consider impl more join strategies
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub enum JoinPlan {
Linear(LinearJoinPlan),
}

/// Determine if a given row should stay in the output. And apply a map filter project before output the row
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct JoinFilter {
/// each element in the outer vector will check if each expr in itself can be eval to same value
/// if not, the row will be filtered out. Useful for equi-join(join based on equality of some columns)
pub ready_equivalences: Vec<Vec<ScalarExpr>>,
/// Apply a map filter project before output the row
pub before: SafeMfpPlan,
}

/// A plan for the execution of a linear join.
///
/// A linear join is a sequence of stages, each of which introduces
/// a new collection. Each stage is represented by a [LinearStagePlan].
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct LinearJoinPlan {
/// The source relation from which we start the join.
pub source_relation: usize,
/// The arrangement to use for the source relation, if any
pub source_key: Option<Vec<ScalarExpr>>,
/// An initial closure to apply before any stages.
///
/// Values of `None` indicate the identity closure.
pub initial_closure: Option<JoinFilter>,
/// A *sequence* of stages to apply one after the other.
pub stage_plans: Vec<LinearStagePlan>,
/// A concluding filter to apply after the last stage.
///
/// Values of `None` indicate the identity closure.
pub final_closure: Option<JoinFilter>,
}

/// A plan for the execution of one stage of a linear join.
///
/// Each stage is a binary join between the current accumulated
/// join results, and a new collection. The former is referred to
/// as the "stream" and the latter the "lookup".
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct LinearStagePlan {
/// The index of the relation into which we will look up.
pub lookup_relation: usize,
/// The key expressions to use for the stream relation.
pub stream_key: Vec<ScalarExpr>,
/// Columns to retain from the stream relation.
/// These columns are those that are not redundant with `stream_key`,
/// and cannot be read out of the key component of an arrangement.
pub stream_thinning: Vec<usize>,
/// The key expressions to use for the lookup relation.
pub lookup_key: Vec<ScalarExpr>,
/// The closure to apply to the concatenation of the key columns,
/// the stream value columns, and the lookup value colunms.
pub closure: JoinFilter,
}
50 changes: 50 additions & 0 deletions src/flow/src/plan/reduce.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

use crate::expr::{AggregateExpr, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr};

#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
pub struct KeyValPlan {
pub key_plan: SafeMfpPlan,
pub val_plan: SafeMfpPlan,
}

/// TODO(discord9): def&impl of Hierarchical aggregates(for min/max with support to deletion) and
/// basic aggregates(for other aggregate functions) and mixed aggregate
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
pub enum ReducePlan {
/// Plan for not computing any aggregations, just determining the set of
/// distinct keys.
Distinct,
/// Plan for computing only accumulable aggregations.
/// Including simple functions like `sum`, `count`, `min/max`(without deletion)
Accumulable(AccumulablePlan),
}

/// Accumulable plan for the execution of a reduction.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
pub struct AccumulablePlan {
/// All of the aggregations we were asked to compute, stored
/// in order.
pub full_aggrs: Vec<AggregateExpr>,
/// All of the non-distinct accumulable aggregates.
/// Each element represents:
/// (index of aggr output, index of value among inputs, aggr expr)
/// These will all be rendered together in one dataflow fragment.
pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
/// Same as above but for all of the `DISTINCT` accumulable aggregations.
pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
}

0 comments on commit 58bd065

Please sign in to comment.