-
Notifications
You must be signed in to change notification settings - Fork 595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(source): parse protobuf into expected struct/array #18419
Conversation
f3395dd
to
a888d33
Compare
# reading as v1 shall not panic | ||
query IT | ||
select * from s; | ||
---- | ||
1 (Alan,Turing) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The panic reproduced on first commit of this PR:
https://buildkite.com/risingwavelabs/pull-request/builds/57644#0191dac4-4ad6-42dc-8f8c-2c3e4b182ed5
failed to run `e2e_test/source_inline/kafka/protobuf/recover.slt`
Caused by:
query failed: db error: ERROR: Failed to run the query
Caused by:
execution panic
[SQL] select * from s;
at e2e_test/source_inline/kafka/protobuf/recover.slt:74
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
which is aligned with json and avro parser.
Note: currently all the parsers don't fully ensure type_expected
will be 100% enforced. (Actually I was once wondering why there's the DataType parameter..) We may add a debug_assert
to ensure the type is consistent
Yes it's not fully checked. Just limiting the scope of this PR and deferring that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering why it's recover
test, since it looks more like schema-change
test to me. During off-line discussion, I found the story is quite complex:
This is what schema change is like:
dataV1 -> dataV2 -> dataVN
|
schemaV1 -> schemaV2 -> schemaVN
|
RW schema
Generally, the parsing process is:
bytes
-> (decode with a schema) parsed data
-> RW data
But there are subtleties. i.e., which schema to use to decode the data?
method 1: (current Avro)
bytes
-> (with schemaVN, aka "writer schema") dataVN (For Avro, this step is required.)
-> (with schemaV1, aka "reader schema") dataV1 -> RW data
method 2: (current Protobuf)
bytes
-> (with schemaV1, aka "reader schema") dataV1
-> RW data
But we can see in both cases, there's a "reader schema" involved. Currently, this is fetched on startup, and the problem comes:
Problem of recovery
Since we don't persist schemaV1, but just fetch the current schema on recovery. (#12982)
It's actually like this:
dataV1 -> dataVN -> dataVM
|
schemaV1 -> schemaVN -> schemaVM
| |
RW schema fetched during recovery, and will be used as "reader schema"
So we may convert and dataVN
to RW data
, instead of dataV1
corresponding to schemaV1
, which is used to generate RW schema
.
Therefore, we need to carefully ensure RW schema
is respected when it's inconsistent from schemaVN
. (This is what this PR does) Otherwise, we should persist schemaV1
Alternative
As mentioned above, since we already cannot ensure schemaV1
is present, why not always using "writer schema", i.e., latest schema to decode the data, and then convert dataVN
to RW data
?
Although RW schema
is converted from schemaV1
, this may be technically possible.
However, there's one subtle edge case: it's possible to have protobuf without schema registry, but from file/HTTP. In this case, the schema might get lost and it will be unrecoverable. If we persist schemaV1
, this can also be handled.
Note about REFRESH SCHEMA
What REFRESH SCHEMA
does is to evolve RW schema
to a new one corresponding to the latest schemaVN
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for elaborating on the issues here on my behalf 🙏
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Previously, the source protobuf parser is ignoring the expected column data type, and returning whatever protobuf value is on the wire. This can potentially cause a data type - scalar variant mismatch panic, especially during schema evolution.
This PR fixed the protobuf parsing logic to be guided by expected column data type, which is aligned with json and avro parser.
As mentioned in #18380 (pre-req of this refactor/fix), the unit tests will be DRY-ed in a follow-up.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.