Author: Zara Lim (@jzaralim) | Release Target: 0.24.0; 7.2.0 | Status: Merged | Discussion: confluentinc#8293
tl;dr: Expose Kafka header data in ksqlDB. Currently, headers are the only metadata in a Kafka record that is invisible to users.
So far, ksqlDB exposes the timestamp, partition and offset of Kafka records as pseudocolumns. The only metadata left that is invisible to ksqlDB users are headers. Kafka headers are a list of zero or more key-value pairs, where the keys are (not necessarily unique) strings and the values are byte arrays. Headers typically contain metadata about records, which can then be used for routing or processing (e.g. the header could store information about how to deserialize strings in the value). If ksqlDB exposes headers to the user, then these steps could be done in ksqlDB.
- Syntax for accessing headers
- Support for all query types except for
INSERT
queries - Including header columns in a data source's schema
- Functions to help decode byte strings to primitive types (BYTES to STRING already exists, new functions would be conversions to INT, BIGINT and DOUBLE)
- Writing headers to output topics - we might do this in the future, but that is a separate discussion
- Advanced functions to help decode byte strings (e.g. functions to deserialize complex serialized data.) - if a user wants to convert something that we don't have a function for, then they can either create their own UDF or use
CREATE FUNCTION
when that becomes available.
Unlike other pseudocolumns, if a user wants to access headers, they will create a column with the HEADERS
keyword. This column will be populated with the full list of header keys and values. There can only be one HEADERS column.
ksql> CREATE STREAM A (
my_headers ARRAY<STRUCT<key STRING, value BYTES>> HEADERS);
If there are multiple HEADERS columns, then an error will be thrown. The type of a header backed column must be exactly ARRAY<STRUCT<key STRING, value BYTES>>
. An error will be thrown if that is not the case.
ksql> CREATE STREAM B (my_headers ARRAY<BYTES> HEADERS);
Error: Columns specified with the HEADERS keyword must be typed as ARRAY<STRUCT<key STRING, value BYTES>>.
Users can also mark a column with HEADER(<key>)
, which will populate the column with the last header that matches the key. The data type must be BYTES
ksql> CREATE STREAM B (value BYTES HEADER(<key>), value_2 BYTES HEADER(<key_2>));
If there is a column labelled HEADERS
, then no there can be no HEADER(<key>)
columns. Otherwise, there can be multiple HEADER(<key>)
columns, but no two can share the same key. If the key is not present in the header, then the value in the column will be null
.
Sources created with schema inferencing can have header columns, they would just have to be specified. For example, the statement CREATE STREAM A (my_headers ARRAY<STRUCT<key STRING, value BYTES>> HEADERS) WITH (kafka_topic='a', value_format='avro');
will create a stream with columns from Schema Registry, as well as one header column.
There will also be new byte conversion functions to help decode header data:
INT_FROM_BYTES(bytes, [byte_order])
- The input byte array must be exactly 4 bytes long, otherwise an error will be thrown. The conversion will by handled byByteBuffer
'sgetInt
method.byte_order
is a string that can either beBIG_ENDIAN
orLITTLE_ENDIAN
. Ifbyte_value
is not supplied, then the function will assumeBIG_ENDIAN
.BIGINT_FROM_BYTES(bytes, [byte_order])
- The input byte array must be exactly 8 bytes long, otherwise an error will be thrown. The conversion will by handled byByteBuffer
'sgetLong
method. Ifbyte_value
is not supplied, then the function will assumeBIG_ENDIAN
.DOUBLE_FROM_BYTES(bytes, [byte_order])
- The input byte array must be exactly 8 bytes long, otherwise an error will be thrown. The conversion will by handled byByteBuffer
'sgetDouble
method. Ifbyte_value
is not supplied, then the function will assumeBIG_ENDIAN
.
In all of these cases, when an error is thrown, then the error gets written to the processing log. If the function is a part of the projection, then null
will be writtin to the output column. If it's a part of the predicate, then the whole record will be skipped. This is consistent with how errors are handled when functions with value column argments throw errors.
The headers columns will be usable as selection items, function arguments or predicates in any query like any other column, except for INSERT SELECT
and INSERT VALUES
statements. They will fail if trying to insert into a headers column. However, when persistent queries project header-backed columns, the header values are copied into the output row’s value (or key), not to headers in the output record.
Header columns will fully be a part of the stream/table's schema. When running SELECT *
, header columns will be included in the projection, and they will also appear when describing a source. However, headers will not be copied to the header fields of sink topics.
Columns in the logical schema will be represented by a new type of column HeaderColumn
which will extend Column
and contain an optional String named key
representing the key in HEADER(key)
. It will store Optional.empty()
for columns defined with HEADERS
. They will be in a new HEADER
namespace, which functions identically to the VALUE
namespace, except that the StreamBuilder
will populate those columns with header values.
The following test cases will be added:
- Creating
STREAM
/TABLE
with header columns SELECT * FROM ...
queries include header columns in the outputDESCRIBE SOURCE ...
includes any header columns- Push and pull queries may select headers
- Queries can use header columns in joins, UDFs, filters, group by and partition by clauses
- Tests for new functions
- Inserting values into header columns is blocked
- Persistent queries that were created in a previous version can still work
- Sink topic headers are unaffected by these changes
- Header columns can be used with the Java client, migrations tool and the CLI
Development of the feature will be done behind a feature flag, ksql.headers.enabled
. When the flag is off, creating new streams and tables with headers will be disabled, but sources with headers will continue to work as usual.
- Create feature flag
- Add
HEADERS
to the ksqlDB syntax - Add header columns to the
LogicalSchema
- Update
SourceBuilder
to add headers - Update anything else that would need to accommodate headers and the new
LogicalSchema
- Disallow header columns in
INSERT
statements - Add
HEADER(<key>)
to the ksqlDB syntax - Update everything header related in the code to enable key-extracted headers
- Disable feature flag
- Implement new byte conversion functions
- The synopses for
CREATE STREAM|TABLE
will be updated to include header columns - A new section in the docs page on pseudocolumns dedicated to describe how to access headers.
- Add docs on each of the new functions
All queries created before the introduction of headers should still work. No compatibility implications are expected.
N/A
In this approach. ROWHEADERS
works exactly like ROWTIME
, ROWOFFSET
and ROWPARTITION
. This is the lowest LOE to implement, but it was rejected for the following reasons:
- When performaing joins, ksqlDB includes all columns in the repartition and changelog topics (confluentinc#7489), including pseudocolumns.
ROWHEADERS
, being a list of structs, would increase the size of the state store much more than the other pseudocolumns. The accepted approach makes headers opt-in, so at least the extra overhead is not forced on every join. - The
ROWHEADERS
column would not be included in the schema (SELECT *
andDESCRIBE
won't include the column). The data in Kafka headers is populated by Kafka clients rather than by Kafka itself, so it makes sense to distinguish it from other system columns .
Example:
-- populate `my_headers` from each row's record headers
CREATE STREAM FOO (my_headers ARRAY<STRUCT<key VARCHAR, value VARCHAR>>)
WITH (headers='my_headers');
- Supporting multiple header-backed columns would likely be awkward in WITH-based approach.
HEADER(<key>)
would not work well in this approach.- Aligns to existing syntax of
KEY
andPRIMARY KEY
that tell ksqlDB to populate a row from the record key, instead the record value (default).
Automatically convert KSQL types (like VARCHAR or INT) from the headers' byte-array values, based on the KSQL type assigned to the column.
CREATE STREAM FOO (my_headers STRUCT<version ARRAY<INT>> HEADERS);
CREATE STREAM FOO2 (latest_version INT HEADER('version'));
- No schema information is carried in the headers when added with the core Headers interface.
- The Connect headers implementation does support schemas and Schema Registry interop, but ksqlDB’s support cannot assume all headers are written in this fashion.
- Converting bytes based on the structure of the column type could only handle certain byte encodings and byte order. For example the common integer-to-bytes codings differ between Java and Python. These differences are better covered by explicit converter functions.
- Automatic conversion implies that the entire list of headers would be eagerly converted at processing time, even when the user may desire only to pluck and convert the latest value for a header key.
Specify the serialization format of the headers so that the bytes can be deserialized against a known format.
CREATE STREAM FOO (my_headers STRUCT<version ARRAY<INT>> HEADERS)
WITH (headers_format='kafka');
CREATE STREAM FOO2 (latest_version INT HEADER('version'))
WITH (headers_format='avro');
- There is no clear convention for using a single serializer to write header values. We should expect it’s common to simply call the equivalent of getBytes on an integer instance, for example.
- Unlike for keys and values, there is no serde configuration for the whole set of headers. Users can mix the serialization approach between different headers added to the same record.