Skip to content

Commit

Permalink
Merge pull request redpanda-data#19965 from BenPope/schema_registry/e…
Browse files Browse the repository at this point in the history
…nable_json_schema

[CORE-4729] schema_registry: Wire up JSON Schema support
  • Loading branch information
BenPope authored Jun 24, 2024
2 parents afed979 + d6274ce commit 30c6c7f
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ get_schemas_types(server::request_t rq, server::reply_t rp) {
rq.req.reset();

static const std::vector<std::string_view> schemas_types{
"PROTOBUF", "AVRO"};
"JSON", "PROTOBUF", "AVRO"};
auto json_rslt = ppj::rjson_serialize(schemas_types);
rp.rep->write_body("json", json_rslt);
return ss::make_ready_future<server::reply_t>(std::move(rp));
Expand Down
8 changes: 5 additions & 3 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "pandaproxy/schema_registry/error.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/exceptions.h"
#include "pandaproxy/schema_registry/json.h"
#include "pandaproxy/schema_registry/protobuf.h"
#include "pandaproxy/schema_registry/store.h"
#include "pandaproxy/schema_registry/types.h"
Expand Down Expand Up @@ -85,7 +86,7 @@ sharded_store::make_canonical_schema(unparsed_schema schema) {
co_return co_await make_canonical_protobuf_schema(
*this, std::move(schema));
case schema_type::json:
throw as_exception(invalid_schema_type(schema.type()));
co_return co_await make_canonical_json_schema(*this, std::move(schema));
}
__builtin_unreachable();
}
Expand All @@ -100,7 +101,8 @@ ss::future<> sharded_store::validate_schema(canonical_schema schema) {
co_await validate_protobuf_schema(*this, std::move(schema));
co_return;
case schema_type::json:
throw as_exception(invalid_schema_type(schema.type()));
co_await make_json_schema_definition((*this), schema);
co_return;
}
__builtin_unreachable();
}
Expand All @@ -117,7 +119,7 @@ sharded_store::make_valid_schema(canonical_schema schema) {
co_return co_await make_protobuf_schema_definition(*this, schema);
}
case schema_type::json:
break;
co_return co_await make_json_schema_definition(*this, schema);
}
throw as_exception(invalid_schema_type(schema.type()));
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/pandaproxy/schema_registry/test/get_schema_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ FIXTURE_TEST(schema_registry_get_schema_types, pandaproxy_test_fixture) {
auto res = http_request(client, "/schemas/types");
BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::ok);
BOOST_REQUIRE_EQUAL(res.body, R"(["PROTOBUF","AVRO"])");
BOOST_REQUIRE_EQUAL(res.body, R"(["JSON","PROTOBUF","AVRO"])");
BOOST_REQUIRE_EQUAL(
res.headers.at(boost::beast::http::field::content_type),
to_header_value(ppj::serialization_format::schema_registry_v1_json));
Expand All @@ -46,7 +46,7 @@ FIXTURE_TEST(schema_registry_get_schema_types, pandaproxy_test_fixture) {
ppj::serialization_format::schema_registry_json);
BOOST_REQUIRE_EQUAL(
res.headers.result(), boost::beast::http::status::ok);
BOOST_REQUIRE_EQUAL(res.body, R"(["PROTOBUF","AVRO"])");
BOOST_REQUIRE_EQUAL(res.body, R"(["JSON","PROTOBUF","AVRO"])");
BOOST_REQUIRE_EQUAL(
res.headers.at(boost::beast::http::field::content_type),
to_header_value(ppj::serialization_format::schema_registry_json));
Expand Down
6 changes: 4 additions & 2 deletions src/v/pandaproxy/schema_registry/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,10 @@ class json_schema_definition {

///\brief A schema that has been validated.
class valid_schema {
using impl
= std::variant<avro_schema_definition, protobuf_schema_definition>;
using impl = std::variant<
avro_schema_definition,
protobuf_schema_definition,
json_schema_definition>;

template<typename T>
using disable_if_valid_schema = std::
Expand Down
11 changes: 11 additions & 0 deletions tests/rptest/tests/rpk_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
Simple id = 1;
}"""

json_number_schema_def = '{"type": "number"}'


class RpkRegistryTest(RedpandaTest):
username = "red"
Expand Down Expand Up @@ -112,6 +114,7 @@ def test_registry_schema(self):
subject_1 = "test_subject_1"
subject_2 = "test_subject_2"
subject_3 = "test_subject_3"
subject_4 = "test_subject_4"

self.create_schema(subject_1, schema1_avro_def,
".avro") # version: 1, ID: 1
Expand Down Expand Up @@ -200,6 +203,14 @@ def find_subject(list, subject):
out = self._rpk.list_schemas(deleted=True)
assert not find_subject(out, subject_1) # Not in the deleted list.

self.create_schema(subject_4, json_number_schema_def,
".json") # version: 1, ID: 4
out = self._rpk.get_schema(subject_4, version="1")
assert len(out) == 1
assert out[0]["subject"] == subject_4
assert out[0]["id"] == 4
assert out[0]["type"] == "JSON"

@cluster(num_nodes=1)
def test_registry_compatibility_level(self):
compat = self._rpk.get_compatibility_level()
Expand Down
44 changes: 40 additions & 4 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def get_subject_name(sns: str, topic: str, field: MessageField,
Simple id = 1;
}"""

json_number_schema_def = '{"type": "number"}'

log_config = LoggingConfig('info',
logger_levels={
'security': 'trace',
Expand Down Expand Up @@ -517,7 +519,7 @@ def test_schemas_types(self):
result_raw = self._get_schemas_types()
assert result_raw.status_code == requests.codes.ok
result = result_raw.json()
assert set(result) == {"PROTOBUF", "AVRO"}
assert set(result) == {"JSON", "PROTOBUF", "AVRO"}

@cluster(num_nodes=3)
def test_get_schema_id_versions(self):
Expand Down Expand Up @@ -1483,6 +1485,40 @@ def test_protobuf(self):
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40402

@cluster(num_nodes=3)
def test_json(self):
"""
Verify basic json functionality
"""

self.logger.info("Posting number as a subject key")
result_raw = self._post_subjects_subject_versions(
subject="simple",
data=json.dumps({
"schema": json_number_schema_def,
"schemaType": "JSON"
}))
self.logger.info(result_raw)
self.logger.info(result_raw.content)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == 1

result_raw = self._request("GET",
f"subjects/simple/versions/1/schema",
headers=HTTP_GET_HEADERS)
self.logger.info(result_raw)
assert result_raw.status_code == requests.codes.ok
assert result_raw.text.strip() == json_number_schema_def.strip()

result_raw = self._request("GET",
f"schemas/ids/1",
headers=HTTP_GET_HEADERS)
self.logger.info(result_raw)
assert result_raw.status_code == requests.codes.ok
result = result_raw.json()
assert result["schemaType"] == "JSON"
assert result["schema"].strip() == json_number_schema_def.strip()

@cluster(num_nodes=4)
@parametrize(protocol=SchemaType.AVRO, client_type=SerdeClientType.Python)
@parametrize(protocol=SchemaType.AVRO, client_type=SerdeClientType.Java)
Expand Down Expand Up @@ -2169,7 +2205,7 @@ def test_schemas_types(self):
result_raw = self._get_schemas_types(auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok
result = result_raw.json()
assert set(result) == {"PROTOBUF", "AVRO"}
assert set(result) == {"JSON", "PROTOBUF", "AVRO"}

@cluster(num_nodes=3)
def test_get_schema_id_versions(self):
Expand Down Expand Up @@ -2780,7 +2816,7 @@ def test_mtls(self):
self.admin_user.certificate.key))
assert result_raw.status_code == requests.codes.ok
result = result_raw.json()
assert set(result) == {"PROTOBUF", "AVRO"}
assert set(result) == {"JSON", "PROTOBUF", "AVRO"}


class SchemaRegistryMTLSAndBasicAuthTest(SchemaRegistryMTLSBase):
Expand All @@ -2801,7 +2837,7 @@ def test_mtls_and_basic_auth(self):
self.admin_user.certificate.key))
assert result_raw.status_code == requests.codes.ok
result = result_raw.json()
assert set(result) == {"PROTOBUF", "AVRO"}
assert set(result) == {"JSON", "PROTOBUF", "AVRO"}


class SchemaValidationEnableWithoutSchemaRegistry(RedpandaTest):
Expand Down

0 comments on commit 30c6c7f

Please sign in to comment.