Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source): parse protobuf into expected struct/array #18419

Merged
merged 4 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/recover.slt
Copy link
Member

@xxchan xxchan Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why it's recover test, since it looks more like schema-change test to me. During off-line discussion, I found the story is quite complex:

This is what schema change is like:

dataV1     ->   dataV2      ->   dataVN
   |
schemaV1   ->   schemaV2   ->   schemaVN
   |
RW schema

Generally, the parsing process is:

bytes 
-> (decode with a schema) parsed data 
-> RW data

But there are subtleties. i.e., which schema to use to decode the data?

method 1: (current Avro)
bytes 
-> (with schemaVN, aka "writer schema") dataVN       (For Avro, this step is required.)
-> (with schemaV1, aka "reader schema") dataV1 -> RW data

method 2: (current Protobuf)
bytes 
-> (with schemaV1, aka "reader schema") dataV1 
-> RW data

But we can see in both cases, there's a "reader schema" involved. Currently, this is fetched on startup, and the problem comes:

Problem of recovery

Since we don't persist schemaV1, but just fetch the current schema on recovery. (#12982)

It's actually like this:

dataV1     ->   dataVN      ->   dataVM
   |
schemaV1   ->   schemaVN   ->   schemaVM
   |               |
RW schema       fetched during recovery, and will be used as "reader schema"

So we may convert and dataVN to RW data, instead of dataV1 corresponding to schemaV1, which is used to generate RW schema.

Therefore, we need to carefully ensure RW schema is respected when it's inconsistent from schemaVN. (This is what this PR does) Otherwise, we should persist schemaV1

Alternative

As mentioned above, since we already cannot ensure schemaV1 is present, why not always using "writer schema", i.e., latest schema to decode the data, and then convert dataVN to RW data?

Although RW schema is converted from schemaV1, this may be technically possible.

However, there's one subtle edge case: it's possible to have protobuf without schema registry, but from file/HTTP. In this case, the schema might get lost and it will be unrecoverable. If we persist schemaV1, this can also be handled.

Note about REFRESH SCHEMA

What REFRESH SCHEMA does is to evolve RW schema to a new one corresponding to the latest schemaVN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for elaborating on the issues here on my behalf 🙏

Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
control substitution on

system ok
rpk topic create 'test-pb-struct'


system ok
jq -sR '{"schema":.,"schemaType":"PROTOBUF"}' << EOF | curl -X POST -H 'content-type: application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value/versions"
syntax = "proto3";
package test;
message User {
int32 id = 1;
Name name = 2;
}
message Name {
string first_name = 1;
string last_name = 2;
}
EOF


# create a source with v1 schema
statement ok
create source s with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test-pb-struct')
format plain encode protobuf (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User');


# register a v2 schema
system ok
jq -sR '{"schema":.,"schemaType":"PROTOBUF"}' << EOF | curl -X POST -H 'content-type: application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value/versions"
syntax = "proto3";
package test;
message User {
int32 id = 1;
Name name = 2;
}
message Name {
string first_name = 1;
string last_name = 2;
string middle_name = 3;
}
EOF


# trigger recovery
statement ok
recover;


sleep 2s


# produce a v2 message
statement ok
create sink sk as select
1 as id,
row('Alan', 'Turing', 'Mathison')::struct<first_name varchar, last_name varchar, middle_name varchar> as name
with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test-pb-struct')
format plain encode protobuf (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User');


sleep 1s


# reading as v1 shall not panic
query IT
select * from s;
----
1 (Alan,Turing)
Comment on lines +73 to +77
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The panic reproduced on first commit of this PR:
https://buildkite.com/risingwavelabs/pull-request/builds/57644#0191dac4-4ad6-42dc-8f8c-2c3e4b182ed5

failed to run `e2e_test/source_inline/kafka/protobuf/recover.slt`
Caused by:
    query failed: db error: ERROR: Failed to run the query
    Caused by:
      execution panic
    [SQL] select * from s;
    at e2e_test/source_inline/kafka/protobuf/recover.slt:74



statement ok
drop sink sk;


statement ok
drop source s;


system ok
curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value"


system ok
curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value?permanent=true"


system ok
rpk topic delete 'test-pb-struct'
Loading
Loading