Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: alter the schema registry of table with connector #15025

Merged
merged 16 commits into from
Mar 8, 2024
Merged
71 changes: 40 additions & 31 deletions e2e_test/schema_registry/alter_sr.slt
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,68 @@ FORMAT PLAIN ENCODE PROTOBUF(
statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

# Changing type is not allowed
statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(city: character varying\)
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.UserWithNewType'
);

# Changing format/encode is not allowed
statement error Feature is not yet implemented: the original definition is FORMAT Plain ENCODE Protobuf, and altering them is not supported yet
ALTER SOURCE src_user FORMAT NATIVE ENCODE PROTOBUF(
statement ok
CREATE TABLE t_user WITH (
connector = 'kafka',
topic = 'sr_pb_test',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);

statement ok
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.UserWithMoreFields'
);
statement error
SELECT age FROM mv_user;

# Dropping columns is not allowed
statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(age: integer\)
statement error
SELECT age FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields

sleep 5s

# Refresh source schema
statement ok
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);
Comment on lines 45 to 48
Copy link
Member

@fuyufjh fuyufjh Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, altering table with same format & encoding but different properties is also supported in this PR, right?

ALTER [SOURCE|TABLE] <name> FORMAT ... ENCODE ... (
    (... new properties ...)
);

If so, please update the PR description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I reverted the implementation of ALTER TABLE FORMAT ENCODE as we discussed in slack that it should be suspended and needs more consideration. This pr is only about ALTER TABLE REFRESH SCHEMA.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Only ALTER SOURCE works, ALTER TABLE ... FORMAT ... ENCODE is not in this PR.

Copy link
Member

@fuyufjh fuyufjh Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me summarize. So after this PR gets merged, it will be like

  • ALTER TABLE <name> FORMAT ... ENCODE ... ( ... )
  • ALTER SOURCE <name> FORMAT ... ENCODE ... ( ... )
  • ALTER TABLE <name> REFRESH SCHEMA
  • ALTER SOURCE <name> REFRESH SCHEMA

Right? Shall we complete the rest two in the future? Asked because I think we should keep the syntax of create source & table unified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right.
ALTER SOURCE <name> REFRESH SCHEMA is on progress.
ALTER TABLE <name> FORMAT ... ENCODE ... ( ... ) will start after further discussion.


statement ok
CREATE MATERIALIZED VIEW mv_more_fields AS SELECT * FROM src_user;
CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;

# Refresh table schema
statement ok
ALTER TABLE t_user REFRESH SCHEMA;

query IIII
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;
----
25 4 0 10

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields

sleep 10s
sleep 5s

query I
SELECT COUNT(*) FROM mv_user;
query IIII
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user;
----
25

statement error
SELECT SUM(age) FROM mv_user;
30 4 0 10

query III
SELECT COUNT(*), MAX(age), MIN(age) FROM mv_more_fields;
----
25 4 0
statement ok
DROP MATERIALIZED VIEW mv_user_more;

statement ok
DROP MATERIALIZED VIEW mv_user;
DROP TABLE t_user;

statement ok
DROP MATERIALIZED VIEW mv_more_fields;
DROP MATERIALIZED VIEW mv_user;

statement ok
DROP SOURCE src_user;
27 changes: 9 additions & 18 deletions e2e_test/schema_registry/pb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from protobuf import user_pb2
from google.protobuf.source_context_pb2 import SourceContext
import sys
import importlib
from google.protobuf.source_context_pb2 import SourceContext
from confluent_kafka import Producer
from confluent_kafka.serialization import (
SerializationContext,
Expand All @@ -26,7 +26,7 @@ def get_user(i):
)

def get_user_with_more_fields(i):
return user_pb2.UserWithMoreFields(
return user_pb2.User(
id=i,
name="User_{}".format(i),
address="Address_{}".format(i),
Expand All @@ -36,16 +36,6 @@ def get_user_with_more_fields(i):
age=i,
)

def get_user_with_new_type(i):
return user_pb2.UserWithNewType(
id=i,
name="User_{}".format(i),
address="Address_{}".format(i),
city=i,
gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE,
sc=SourceContext(file_name="source/context_{:03}.proto".format(i)),
)

def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message):
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
serializer = ProtobufSerializer(
Expand All @@ -69,7 +59,7 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u


if __name__ == "__main__":
if len(sys.argv) < 5:
if len(sys.argv) < 6:
print("pb.py <brokerlist> <schema-registry-url> <topic> <num-records> <pb_message>")
exit(1)

Expand All @@ -79,10 +69,11 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u
num_records = int(sys.argv[4])
pb_message = sys.argv[5]

user_pb2 = importlib.import_module(f'protobuf.{pb_message}_pb2')

all_pb_messages = {
'user': (get_user, user_pb2.User),
'user_with_more_fields': (get_user_with_more_fields, user_pb2.UserWithMoreFields),
'user_with_new_type': (get_user_with_new_type, user_pb2.UserWithNewType),
'user': get_user,
'user_with_more_fields': get_user_with_more_fields,
}

assert pb_message in all_pb_messages, f'pb_message must be one of {list(all_pb_messages.keys())}'
Expand All @@ -91,7 +82,7 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u
producer_conf = {"bootstrap.servers": broker_list}

try:
send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, *all_pb_messages[pb_message])
send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, all_pb_messages[pb_message], user_pb2.User)
except Exception as e:
print("Send Protobuf data to schema registry and kafka failed {}", e)
exit(1)
19 changes: 0 additions & 19 deletions e2e_test/schema_registry/protobuf/user.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,3 @@ enum Gender {
MALE = 0;
FEMALE = 1;
}

message UserWithMoreFields {
int32 id = 1;
string name = 2;
string address = 3;
string city = 4;
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
int32 age = 7; // new field here
}

message UserWithNewType {
int32 id = 1;
string name = 2;
string address = 3;
int32 city = 4; // change the type from string to int32
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
}
10 changes: 3 additions & 7 deletions e2e_test/schema_registry/protobuf/user_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions e2e_test/schema_registry/protobuf/user_with_more_fields.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
syntax = "proto3";

package test;

import "google/protobuf/source_context.proto";

message User {
int32 id = 1;
string name = 2;
string address = 3;
string city = 4;
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
int32 age = 7; // new field here
}

enum Gender {
MALE = 0;
FEMALE = 1;
}
29 changes: 29 additions & 0 deletions e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading