Author: agavra | Release Target: 5.3 | Status: In Discussion | Discussion: link
tl;dr: Address the most common feature requests related to UDF/UDAFs including struct support, generic types, variable arguments and complex aggregation
There have been many requests (#2163) to improve the UDF/UDAF interface in KSQL. Of these features, four stand out:
- Custom Struct Support - UDFs can only operate on standard Java types (
Map
,List
,String
, etc...) and do not accept structured types. This dramatically limits what KSQL can handle, as much of the data flowing through kafka is structured Avro/JSON. - Generics Type Support - Today, there is no way to implement a single UDF that supports data
structures with implicit types. For example, implementing a generic
list_union
operator would require different method signatures for each supported type. This quickly becomes unmanageable when there are multiple arguments. - Variable Argument Support - Varargs are required to create UDFs that accept an arbitrary
number of inputs. Creating a simple
concat
UDF that could sum multiple columns would require creating method signatures for as many input combinations as desired. - Complex Aggregation Models - Today, KSQL only supports tracking a single variable for
aggregation and it must mirror the output type. To implement an
avg
UDAF, it is necessary to store both acount
and arunning_total
.
All changes will be made for the new annotation-based UDF interfaces and will not be backported to the legacy implementation. The bullet points mentioned above are all in scope, but the following will not be included in this KLIP:
- UDTFs - multiple output operations are more complicated and will require a KLIP of their own
- Generics/Varargs for UDAFs - the support for Generics/Varargs will only extend to UDFs.
Basic UDF operations that are supported in SQL languages will now be unblocked for KSQL. Some of these are outlined below:
Improvement | Value |
---|---|
Generics support | Unblock ARRAY_LENGTH , ARRAY_SLICE , ARRAY_TO_STRING , etc... |
Varargs support | CONCAT , MAX , MIN etc... |
Complex Aggregation | AVG , PERCENTILE , etc... |
Structured UDFs | Arbitrary data handling without intermediary transforms |
Some public APIs will be changed, though all will be changed in a data backwards compatible fashion:
Improvement | API change |
---|---|
Generics support | UDF interface will accept inferred generics (e.g. List<T> ) |
Varargs support | UDF interface will accept ... in the method declarations |
Complex Aggregation | UDAFs may include VR export(A agg) to convert aggregated to output |
Structured UDFs | UDF/UDAF interfaces will accept the Kafka Connect Struct type |
Each of these improvements is relatively self-contained and can be worked on independently and in parallel. Below are detailed designs for each:
After refactoring done in #2411, the type coercion
ensures that the validation does not unnecessarily prevent usage of Struct
. To complete support
for sturcutred UDFS, we also need to have some mechanism to create the output schema. To address
this, we can resolve the schema as part of the UDF specification, requiring users to specify the
struct schema inline (which will be parsed using the same ANTLR grammar).
@UdfSchema
public static final Schema SCHEMA = SchemaBuilder.struct()...build();
@Udf(schema="STRUCT<val VARCHAR, length INT>")
public Struct generate(
@UdfParam(schema="STRUCT<val VARCHAR>") final Struct from
) {
return new Struct(...);
}
UdfFactory
could then load the correct UDF given the schema by matching the schema of the type
against various different methods. It will be possible to support multiple methods with different
struct schemas if the name of the udf method is changed (note that the java method name is ignored
from actual evaluation).
Today, we eagerly convert UDF output types to corresponding Connect schema types at the time that we
compile the UDFs and not the time that we use them in a SELECT
clause. Since there is no
corresponding "unresolved" type in Connect, we fail this conversion. To solve this problem, we can
infer the type from the source schema and compile the code to work with any component type (e.g.
omit the generics from the compiled code).
@Udf("returns a sublist of 'list'")
public <T> List<T> sublist(
@UdfParameter final List<T> list,
@UdfParameter final int start,
@UdfParameter final int end) {
return list.subList(start, end);
}
We need to ensure that the output type can be inferred. This means that we need to fail if the
signature is something like <T> List<T> convert(List<String> list)
. This can be done during
compilation by following a simple rule: any generic type used in the output must be present in at
least one of the parameters. We can access this information via reflection.
The DESCRIBE FUNCTION
call to describe such generic values will display a SQL type that is not
supported in DDL statements (e.g. ARRAY<ANY<T>>
) (See documentation section).
NOTE: Supporting a wildcard output type is not covered by this design since we would not be able
to generate the output schema for select statements, however supporting wildcards in the input types
(e.g. Long length(List<?> list)
) should be possible.
Varargs boils down to supporting native Java arrays as parameters and ensuring that component types
are resolved properly. Anytime a method is registered with variable args (e.g. String... args
) we
will register a corresponding function with the wrapping array type (e.g. String[]
).
We will need to resolve methods such as foo(Int, String...)
and foo(String, String...)
as well
as accept empty arguments to foo(String...)
. If any parameter is null
, it will be considered
valid for any vararg declaration.
@Udf("sums all arguments in 'args'")
public int sum(@UdfParameter final int... args) {
return Arrays.stream(args).sum();
}
Complex aggregation for UDAFs requires the ability to aggregate on a value that is later reduced to
a value of a different type. To support this, we will change the Udaf
interface in a backwards
incompatible way (code built using the old Udaf
interface will not compile, but migration is
straightforward):
/**
* ...
* @param <V> value type (the value that is accepted)
* @param <A> aggregate type
* @param <VR> reduce type (the value to return)
*/
public interface Udaf<V, A, VR> {
...
/**
* Redcues an aggregate into the return type.
*
* @param aggregate the running aggregate
* @return the final return value
*/
VR reduce(A aggregate);
}
reduce
will be taken as the behavioral parameter to a mapValues
task that will be applied after
the aggregate task in the generated KStreams topology.
Each of these improvements will come with some built-in UDF/UDAF examples. These examples will then be comprehensively covered by both unit tests as well as corresponding Query Translation Tests. Specifically, the UDFs described in the Value section will all be implemented alongside this KLIP. For the new UDAF complex aggregation, we will ensure that the generated topologies do not include any unexpected steps such as a repartition.
- The
Example UDF Class
inudf.rst
will be extended to showcase the new features. Namely, we will replace the multiply double method to include varargs:
@Udf(description = "multiply N non-nullable DOUBLEs.") public double multiply(final double... values) { return Arrays.stream(values).reduce((a, b) -> a * b);; }
- The
DESCRIBE FUNCTION
for vararg functions will need to be updated to use the...
syntax:
Name : ARRAYCONTAINS Author : Confluent Type : scalar Jar : internal Variations : Variation : ARRAYCONTAINS(VARCHAR... ) Returns : BOOLEAN
- The
Supported Types
section inudf.rst
will includeStruct
and have an updated note at the bottom which reads:
Note: Complex types other than List, Map and Struct are not currently supported. Generic types for List and Map are supported, but component elements must be of one of the supported types. If the types are generic, the output type must be able to be inferred from one or more of the input parameters.
- Function descriptions as part of
DESCRIBE FUNCTION
should also show the schema for anyStruct
that is used:
Name : FOO Author : Confluent Type : scalar Jar : internal Variations : Variation : FOO(STRUCT<'VAL' BIGINT, 'OTHER' VARCHAR>) Returns : STRUCT<'VAL' BIGINT, 'ANOTHER' LIST<VARCHAR>)
- The
DESCRIBE FUNCTION
should indicate when a function accepts generics usingANY<?>
where?
is a letter that begins withT1
and is incremented for each inferred type.
Name : FOO Author : Confluent Type : scalar Jar : internal Variations : Variation : FOO(ARRAY<ANY<T1>>, ANY<T2>) Returns : ANY<T1>
- The
UDAF
section inudf.rst
will update the example fromSum
toAverage
and include non-trivial usage of thereduce
API:
The class below creates a UDAF named
my_average
..... code:: java
@UdafDescription(name="my_average", description="averages") public class AverageUdaf { private static final Schema SCHEMA = SchemaBuilder.struct() .field("sum", Schema.INT64_SCHEMA) .field("count", Schema.INT32_SCHEMA) .build(); @UdafFactory(description = "averages longs") public static TableUdaf<Long, Struct, Double> createAverageLong() { return new TableUdaf<Long, Struct, Double>() { @Override public Struct initialize() { return new Struct(SCHEMA).put("sum", 0L).put("count", 0); } @Override public Struct aggregate(final Long value, final Struct aggregate) { return new Struct(SCHEMA) .put("sum", aggregate.get("sum") + value) .put("count", aggregate.get("count") + 1); } @Override public Struct undo(final Long valueToUndo, final Struct aggregate) { return new Struct(SCHEMA) .put("sum", aggregate.get("sum") - valueToUndo) .put("count", aggregate.get("count") - 1); } @Override public Struct merge(final Struct aggOne, final Struct aggTwo) { return new Struct(SCHEMA) .put("sum", aggOne.get("sum") + aggTwo.get("sum")) .put("count", aggOne.get("count") + aggTwo.get("count")); } @Override public Double reduce(final Struct aggregate) { return ((Double) aggregate.getInt64("sum")) / aggregate.getInt32("count"); } }; } }
- For exportable UDAFs, we will also need to generate updated documentation for
DESCRIBE FUNCTION
calls to make sure that the output value is not the intermediate value, but rather the final exported value. For example:
ksql> DESCRIBE FUNCTION AVERAGE; Name : AVERAGE Author : Confluent Overview : Returns the average value for a window. Type : aggregate Jar : internal Variations : Variation : AVERAGE(BIGINT) Returns : DOUBLE // Note that the signature for this is actually Udaf<Long, Struct, DOUBLE> as seen above
- Syntax reference needs to be updated to reflect any new UDF/UDAFs that are implemented as part of this KLIP.
- Custom UDAF implementations will fail to compile. Upgrade notes will include a section detailing the following upgrade semantics:
To update to 5.3, any custom UDAF implementations must make the following changes in order to compile:
- Add a third type parameter to your class signature that is equivalent to the second (e.g.
MyUdaf<String, Long>
->MyUdaf<String, Long, Long>
)- Implement the
VR reduce(A aggregate)
method. Since old implementations of UDAf always hadVR == A
, this can simply bereturn aggregate;
.There will be no changes to the functionality of your custom UDAF.
N/A