-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC 001: WASM UDFs #1
Conversation
The biggest open question (assuming we go the WASI |
865d8f3
to
3afaeb8
Compare
7dc1de4
to
948be15
Compare
text/001-wasm-udfs.md
Outdated
pointer and length in bytes of the segment to the function as arguments. | ||
Unfortunately, only a single value may be returned by functions, so this | ||
method only works for passing input. | ||
1. _Data serialization format_. Complex data types such as `DATE` or `TEXT` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the evaluation criteria and the "graduation exam" for any UDF interface (this includes argument/output serialization as well as any boilerplate "wrapper" code we need to make the guest function use) would be being able to:
...emit and receive Arrow data types like lists or Structs
They're not currently supported by DataFusion directly (it doesn't have support in SQL for building a Struct or a List, but can likely pass those from a Parquet file to the caller), but they can be useful in analytics:
- aggregation functions like
array_agg
can output a List - JSON objects can be modelled as a Struct (though Structs have a fixed schema in Parquet/Arrow, I think Snowflake implements the VARIANT data type using those + some query planning tricks to merge schemas of disparate Structs, but my source for Snowflake using Parquet for storage is a single tweet, so I could be wrong)
...implement UDAFs
splitgraph/seafowl#105 They are mentioned in this RFC, but they have a difference from UDFs in that they are supposed to store state across batches (e.g. if I'm computing a sum, I need to keep the current running total in memory): https://docs.rs/datafusion/latest/datafusion/physical_plan/udaf/struct.AggregateUDF.html
...implement window and table-valued functions
Those aren't currently doable in DataFusion but we should keep those in mind. Interface-wise, a table-valued function will be able to return a tuple (or a vector of tuples, i.e. a full "slice/batch" of a table). I haven't researched window functions, but I think being able to maintain some state and accept/return a slice of a table should cover that.
text/001-wasm-udfs.md
Outdated
transformation prevents us from using the original serialized Arrow data as | ||
originally received. Theoretically, it could be possible to transpose the | ||
columnar input to row-tuples but keep the serialization of individual scalar | ||
values, but this seems a lot more complex than what its worth. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to explain/dig deeper into the Arrow IPC / Flight / direct Arrow array copy method for future reference.
Current proposal
I illustrated what the current proposal looks like:
To sum up:
- The interface that DataFusion gives us (as a DataFusion UD(aggregation,window,table)F) is:
- accept a vector of ColumnarValues (close to being a set of pointers to Arrow arrays with extra DataFusion metadata in the Seafowl process' heap memory)
- return a ColumnarValue (an Arrow array with results from this batch)
- for some UDFs flavours, we might need to return a vector of ColumnarValues or return more than one vector (e.g. the aggregate UDF "accumulation state")
- We transpose the vector of ColumnarValues (column-oriented) into an iterator of tuples (row-oriented)
- We use msgpack to serialize/deserialize these into a language-independent binary blob
- We use WASI stdin/stdout to pass/receive the binary blob to the guest
- The guest function can be written in any language that:
- compiles to wasm-wasi
- can read/write to stdin/stdout
- can serialize/deserialize msgpack
Arrow proposal
Here's the Arrow proposal as I understand it:
- We copy the Arrow arrays directly from DataFusion's
ColumnarValue
structs into the WASM linear memory- This exploits the fact that the Arrow representation is language-independent. Although this might not be true, see the ominous "This design also restricts the C Data Interface to in-process data sharing. For interprocess communication, we recommend use of the Arrow IPC format." here).
- We pass an array of pointers to those Arrow arrays to the WASM guest function
- Vaguely, we can pass an array to WASM as an
i32
memory offset +i32
length. This is something that might not be trivial to do in a language-independent way.
- Vaguely, we can pass an array to WASM as an
- The guest function:
- iterates through those arrays using its own copy of the Arrow library (which converts Arrow values to the language-specific representation)
- does the actual computation
- writes the result out to construct a new Arrow array (in the WASM linear memory)
- returns a pointer to this array
- We copy the array out of the WASM memory into Seafowl's heap memory, producing an output
ColumnarValue
that we return to DataFusion
This eliminates the overhead from:
- host-side conversion between Arrow and msgpack (the guest-side deserialization still happens, but it's now from Arrow to the guest-side representation of a value)
- Using WASI stdin/stdout to pass an arbitrary binary buffer to the WASM guest (direct memory copy instead)
- Invoking the WASM function for every tuple (instead of for a batch of tuples)
This comes at the cost of requiring the function writer to (statically or dynamically) link to the Arrow library.
For reference, here's what the list of shared libraries looks like in my pyarrow
installation (these are x86_64 binaries, but I'm assuming the WASM size will have a similar order of magnitude):
$ ls -1sShr *.so*
52K _dataset_orc.cpython-39-x86_64-linux-gnu.so
84K _json.cpython-39-x86_64-linux-gnu.so
96K _feather.cpython-39-x86_64-linux-gnu.so
128K _hdfs.cpython-39-x86_64-linux-gnu.so
128K libarrow_python_flight.so.900
156K _substrait.cpython-39-x86_64-linux-gnu.so
176K _orc.cpython-39-x86_64-linux-gnu.so
200K _hdfsio.cpython-39-x86_64-linux-gnu.so
204K _s3fs.cpython-39-x86_64-linux-gnu.so
204K _gcsfs.cpython-39-x86_64-linux-gnu.so
240K _exec_plan.cpython-39-x86_64-linux-gnu.so
248K _parquet_encryption.cpython-39-x86_64-linux-gnu.so
248K libplasma.so.900
256K _plasma.cpython-39-x86_64-linux-gnu.so
340K _csv.cpython-39-x86_64-linux-gnu.so
340K _dataset_parquet.cpython-39-x86_64-linux-gnu.so
492K _fs.cpython-39-x86_64-linux-gnu.so
516K _parquet.cpython-39-x86_64-linux-gnu.so
844K _dataset.cpython-39-x86_64-linux-gnu.so
1.2M _flight.cpython-39-x86_64-linux-gnu.so
1.2M _compute.cpython-39-x86_64-linux-gnu.so
2.2M libarrow_python.so.900
2.4M libarrow_dataset.so.900
4.0M lib.cpython-39-x86_64-linux-gnu.so
4.4M libarrow_substrait.so.900
9.2M libparquet.so.900
16M libarrow_flight.so.900
59M libarrow.so.900
These are pretty big and interconnected (e.g. the Python-specific lib.cpython-39-x86_64-linux-gnu.so
dynamically links to libarrow_python.so.900
, which links to libarrow.so.900
and libparquet.so.900
). Every function writer would now have to bundle a subset of these with their WASM binary.
One could argue that some of these (e.g. anything that ends with .so.900
) are language-independent and can be dynamically linked at runtime. However:
- There's still some language-specific code the function will need to ship (intuitively, it's because the way C represents a datetime in memory and thus the most convenient way to work with those in that language will not necessarily be the same as how AssemblyScript / Rust do it). In the Python case, it looks like these are
lib.cpython-39-x86_64-linux-gnu.so
andlibarrow_python.so.900
. - Dynamic linking in WASM is possible (I haven't done much research), but will likely:
- require us to limit the number of supported languages, instead of stating that "you can write a function in anything that compiles to WASM and supports messagepack"
- place a bigger burden on the function writer (again, outside of languages where we explicitly publish a harness)
Conclusion
If the UD(a,w,t)F interface that we provide to our users, instead of WASM, is a limited set of languages, I could see the Arrow direct copy method (if possible, or IPC/Flight) winning performance-wise, just because we, as the function runner, won't be exposing any of the Arrow/WASM stuff to the user. The user will be able to write:
CREATE FUNCTION some_func(xs [REAL], a REAL, b REAL) RETURNS REAL AS
'
math.stdev(xs.map(x -> math.log(a + b*x)))
'
LANGUAGE SOMESCRIPT;
We'll be able to then add boilerplate code around this function (to call it in a loop on an Arrow array), compile it to WASM, dynamically link it to the Arrow library that's shared between all functions etc. I think that's the ultimate long-term design goal for this feature. In the meantime, WASI + msgpack feels like the least common denominator solution if we want to support the biggest amount of possible languages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, very comprehensive comment, thank you!
I agree with all of this with the possible exception of the conclusion ;) but this depends on how Seafowl is positioned.
If we're primarily a database supporting a variety langauges one way or another, then WASM might not always be the most performant means to that goal.
If -on the other hand- our whole story is basically "the DB you can run on a toaster or a supercomputer along with all your UD{a,w,t}Fs", then it makes sense to think primarily in terms of WASM.
text/001-wasm-udfs.md
Outdated
A big advantage of using WASI is that it's easy to invoke UDFs from the command | ||
line. The Messagepack-encoded input can be written to a file, for example with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's nice, but not a deal breaker. If I'm writing a test for my function, I'm likely going to be testing the actual implementation rather than the stdin+msgpack harness around it. If my language has a REPL, I don't need to encode my input as msgpack and then invoke the full program with wasmtime
, I could execute the implementation directly.
One interesting possibility is allowing file / network access to the function via WASI. That way, one could create a table UDF that loads data from a remote API/file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, this is not super-important.
I couldn't make WASI fast enough, so we're back to shared memory anyway, with the exception of stderr
, which I found to be indispensible.
The part which might warrant some testing is input reading / parsing.
But writing a minimal harness for this doesn't seem like a huge amount of work and it will work with UDFs in any WASM language.
text/001-wasm-udfs.md
Outdated
- Which WASM languages should receive first-class support? Rust is pretty | ||
obvious, but eg: C++ programmers may prefer a different MessagePack library | ||
than C devs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be first-class support here, an example harness/template repo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, example / template UDF to get started.
ad267f5
to
f4158ec
Compare
No description provided.