Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: materialize the result to handle non-deterministic functions #24

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions rfcs/0024-nondeterministic-functions.md
xxchan marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

@xxchan xxchan Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just came up with a problem: IIRC currently UDF is just an expression, which can be used in other places beside Project, e.g., Filter. Should we make them occur only in Project? Besides the determinism problem, there are also optimizations like risingwavelabs/risingwave#8703

cc @wangrunji0408 @TennyZhuang

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, https://github.com/risingwavelabs/rfcs/pull/12/files#diff-135315ddd7b7929c8daf439f35043ba3c168ce620ab72c56740a6055d15e8f9fR141 refused to add a new operator, but it seems we eventually need to add some 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly, I need to clarify that the "UDF" and "non-deterministic expression" are independent concepts. PROC_TIME() and RAND() are built-in expressions but they are non-deterministic. A user-defined function can also be deterministic if a user makes a contract with us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And materialized Project is just used for non-deterministic expression on an "updatable stream". We can use a normal project on the append-only stream for non-deterministic expression too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree to refuse to add a new operator for UDF or async expression but not for non-deterministic expression. In another word, if we did add a "UDFProject", now we maybe have to accept StreamProject, StreamAsyncProject, StreamMaterializeProject, StreamAsyncMaterializeProject 😅 🥵

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, after thinking a while I feel that although conceptually different, practically they cannot be so cleanly separated 😅:

Since we want to "always materializing UDF" (except for rare cases and used with caution), MaterializeProject would become the main executor for UDFs. Optimizations for UDF in Project like risingwavelabs/risingwave#8703 would be in vain. On the other hand, do you think we should optimize MaterializeProject for async execution?

Copy link
Member

@xxchan xxchan Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I'm wrong again😇😇😇. I forgot append-only stream. But the points might still apply

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all the optimization for async execution should be done behind the expression framework 🤔 Well there also are some executor-level optimization. For example, some users might not care about the order of the records on the append-only stream and we might reorder them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the optimization for async execution should be done behind the expression framework

Is that enough? If so we can only tweak the performance for expr on one chunk, but not amoung chunks, like buffered in https://github.com/risingwavelabs/risingwave/pull/8703/files

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually I don’t mind adding many different executor variants in the future, just like the TopN variants 😅

Let’s conclude the discussion.

xxchan marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
---
feature: non_deterministic_functions
authors:
- "st1page"
start_date: "2022/11/21"
---

# nondeterministic-functions

## Summary

We introduce the `MaterializeProject` stream executor to handle non-deterministic function on updatable stream.
xxchan marked this conversation as resolved.
Show resolved Hide resolved

## Motivation

In the discussion about the udf, @skyzh raise the issue about the non-deterministic function on updatable stream <https://github.com/risingwavelabs/rfcs/pull/12#issuecomment-1305105614>. This issue also appears on the built-in functions such as `random(),` `proc_time()` or `now()`.
xxchan marked this conversation as resolved.
Show resolved Hide resolved

## Design

We will introduce a `MaterializeProject`, it will materialize some **partial** columns of result with the stream key of the input as the primiary key. When an `Insert` operation comes, it will compute the result and materialize some columns. when `Update` or `Delete` comes, it will lookup its state to replace the old value of the operation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Find a problem: not all expressions can be extracted to a Project easily, e.g., in FILTER clause. 🤡

This is mentioned in #12 as a reason against a dedicated operator for UDF.

For example, if we have a materialized view with SQL `select random() as rd, v*2 as vv, pk from t` and the pk is the primary key of the table t. The stream processing plan will be like:

```plain
StreamMaterialize(columns:[rd, vv, pk])
StreamMaterializeProject(
exprs: [Random(), InputRef(0) * 2, InputRef(1)],
materialized_cols: [0],
state_table: [pk, rd]
st1page marked this conversation as resolved.
Show resolved Hide resolved
)
StreamScan(fields:[v, pk])
```

The `StreamMaterializeProject` will materialize the first columns of its input which is the `random()` function's result. When we have a serial of changes `Insert(v = 10, pk = 1), Update((v = 10, pk = 1) -> (v = 20, pk = 1)), Delete(v = 20, pk = 1)` the `StreamMaterializeProject` will handle it as following:
st1page marked this conversation as resolved.
Show resolved Hide resolved

1. receive `Insert(v = 10, pk = 1)`
- comput the result `(rd = 111, v = 20, pk = 1)`
- insert into state table `(pk = 1, rd = 111)`
- output `Insert(rd = 111, v = 20, pk = 1)`

2. receive `Update((v = 10, pk = 1) -> (v = 20, pk = 1))`
- compute the old value's project result `(rd = 55555, v = 20, pk = 1)`
- get the old values `(pk = 1, rd = 111)` from state table modify the result old value to `(rd = 111, v = 10, pk = 1)`
xxchan marked this conversation as resolved.
Show resolved Hide resolved
- compute the new value `(rd = 444, v = 30, pk = 1)`
- insert into state table `(pk = 1, rd = 444)`
- output `Update((rd = 111, v = 20, pk = 1) -> (rd = 444,v = 40, pk = 1))`
xxchan marked this conversation as resolved.
Show resolved Hide resolved
3. receive `Delete(v = 20, pk = 1)`
- compute the old value's project result `(rd = 55555, v = 40, pk = 1)`
- get the old values `(pk = 1, rd = 444)` from state table modify the result old value to `(rd = 444, v = 40, pk = 1)`
- delete state table `(pk = 1)`
- output `Delete(rd = 444,v = 40, pk = 1)`

## Future possibilities

We might can pull-up those `StreamMaterializeProject` to the first `Materialize` executor or other stateful executors. For example, the above example's plan can be

```plain
StreamMaterialize(columns:[rd, vv, pk], flag: check_and_modify)
StreamProject(exprs: [Random(), InputRef(0) * 2, InputRef(1)])
StreamScan(fields:[v, pk])
```