diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java index cfba321..5c14d10 100644 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java +++ b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java @@ -23,6 +23,7 @@ */ package com.bakdata.schemaregistrymock; +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import com.github.tomakehurst.wiremock.WireMockServer; @@ -36,12 +37,14 @@ import com.github.tomakehurst.wiremock.http.ResponseDefinition; import com.github.tomakehurst.wiremock.matching.UrlPathPattern; import com.github.tomakehurst.wiremock.matching.UrlPattern; +import com.google.common.base.CharMatcher; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; @@ -54,11 +57,14 @@ /** *

The schema registry mock implements a few basic HTTP endpoints that are used by the Avro serdes.

- * In particular, + * In particular, you can * * *

If you use the TestTopology of the fluent Kafka Streams test, you don't have to interact with this class at @@ -96,6 +102,7 @@ public class SchemaRegistryMock { private static final String ALL_SUBJECT_PATTERN = "/subjects"; private static final String SCHEMA_PATH_PATTERN = "/subjects/[^/]+/versions"; private static final String SCHEMA_BY_ID_PATTERN = "/schemas/ids/"; + private static final String SPECIFIC_SCHEMA_PATTERN = "/subjects/[^/]+"; private static final int IDENTITY_MAP_CAPACITY = 1000; private final ListVersionsHandler listVersionsHandler = new ListVersionsHandler(); @@ -103,10 +110,11 @@ public class SchemaRegistryMock { private final AutoRegistrationHandler autoRegistrationHandler = new AutoRegistrationHandler(); private final DeleteSubjectHandler deleteSubjectHandler = new DeleteSubjectHandler(); private final AllSubjectsHandler allSubjectsHandler = new AllSubjectsHandler(); + private final SchemaHandler schemaHandler = new SchemaHandler(); private final WireMockServer mockSchemaRegistry = new WireMockServer( WireMockConfiguration.wireMockConfig().dynamicPort() .extensions(this.autoRegistrationHandler, this.listVersionsHandler, this.getVersionHandler, - this.deleteSubjectHandler, this.allSubjectsHandler)); + this.deleteSubjectHandler, this.allSubjectsHandler, this.schemaHandler)); private final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient(); private static UrlPattern getSchemaPattern(final Integer id) { @@ -139,6 +147,8 @@ public void start() { .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN)) .willReturn(WireMock.aResponse().withTransformers(this.allSubjectsHandler.getName()))); + this.mockSchemaRegistry.stubFor(WireMock.post(WireMock.urlPathMatching(SPECIFIC_SCHEMA_PATTERN)) + .willReturn(WireMock.aResponse().withTransformers(this.schemaHandler.getName()))); } public void stop() { @@ -234,8 +244,8 @@ private Collection listAllSubjects() { } private abstract static class SubjectsHandler extends ResponseDefinitionTransformer { - // Expected url pattern /subjects(/.*-value/versions) - protected final Splitter urlSplitter = Splitter.on('/').omitEmptyStrings(); + // Expected url pattern /subjects(/.*-value/(versions|?param)) + protected final Splitter urlSplitter = Splitter.on(CharMatcher.anyOf("/?")).omitEmptyStrings(); @Override public boolean applyGlobally() { @@ -336,4 +346,56 @@ public String getName() { return AllSubjectsHandler.class.getSimpleName(); } } + + /** + * The SchemaHandler returns version and id for a given schema. + * + * Note: It returns "Schema not found, Error code 40403" for a deleted schema even if the request parameter + * 'deleted' is set to true. The MockSchemaRegistryClient does not save the version of a deleted schema. + */ + private class SchemaHandler extends SubjectsHandler { + @Override + public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, + final FileSource files, final Parameters parameters) { + final String requestSubject = this.getSubject(request); + // Check if requestSubject exists. This is required because the mock always throws an exception + // with 'Schema not found' + final boolean subjectExists = SchemaRegistryMock.this.listAllSubjects().stream() + .anyMatch(subject -> subject.equals(requestSubject)); + + if (!subjectExists) { + final ErrorMessage error = new ErrorMessage(40401, "Subject not found"); + return ResponseDefinitionBuilder.jsonResponse(error, HTTP_NOT_FOUND); + } + + final Schema schema; + try { + schema = new Schema.Parser() + .parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema()); + } catch (final IOException e) { + final ErrorMessage error = + new ErrorMessage(HTTP_INTERNAL_ERROR, "Error while looking up schema under subject topic"); + return ResponseDefinitionBuilder.jsonResponse(error, HTTP_INTERNAL_ERROR); + } + + try { + final int schemaId = SchemaRegistryMock.this.schemaRegistryClient.getId(requestSubject, schema); + final int schemaVersion = + SchemaRegistryMock.this.schemaRegistryClient.getVersion(requestSubject, schema); + return ResponseDefinitionBuilder + .jsonResponse(new io.confluent.kafka.schemaregistry.client.rest.entities.Schema( + requestSubject, schemaVersion, schemaId, schema.toString())); + } catch (final RestClientException | IOException e) { + final ErrorMessage error = new ErrorMessage(40403, "Schema not found"); + return ResponseDefinitionBuilder.jsonResponse(error, HTTP_NOT_FOUND); + } + } + + @Override + public String getName() { + return SchemaHandler.class.getSimpleName(); + } + + } + } diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java index f2a1c9f..b98cebe 100644 --- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java +++ b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java @@ -26,6 +26,7 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; @@ -97,7 +98,8 @@ void shouldHaveSchemaVersions() throws IOException, RestClientException { final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); assertThat(versions.size()).isOne(); - final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); + final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient() + .getSchemaMetadata(topic + "-value", versions.get(0)); assertThat(metadata.getId()).isEqualTo(id); final String schemaString = metadata.getSchema(); final Schema retrievedSchema = new Schema.Parser().parse(schemaString); @@ -128,7 +130,8 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); assertThat(versions.size()).isEqualTo(2); - final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); + final SchemaMetadata metadata = + this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); final int metadataId = metadata.getId(); assertThat(metadataId).isNotEqualTo(id1); assertThat(metadataId).isEqualTo(id2); @@ -140,7 +143,8 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { @Test void shouldNotHaveLatestSchemaVersionForUnknownSubject() { assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist")) + .isThrownBy( + () -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist")) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } @@ -211,32 +215,109 @@ void shouldNotDeleteUnknownSubject() { .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } - @Test void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClientException { final Schema valueSchema = createSchema("value_schema"); final String topic = "test-topic"; final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema); - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); + final SchemaRegistryClient schemaRegistryClient = this.schemaRegistry.getSchemaRegistryClient(); + final List versions = schemaRegistryClient.getAllVersions(topic + "-value"); assertThat(versions.size()).isOne(); - final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); + final SchemaMetadata metadata = schemaRegistryClient.getSchemaMetadata(topic + "-value", versions.get(0)); assertThat(metadata.getId()).isEqualTo(id); - assertThat(this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) + assertThat(schemaRegistryClient.getLatestSchemaMetadata(topic + "-value")) .isNotNull(); this.schemaRegistry.deleteValueSchema(topic); assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value")) + .isThrownBy(() -> schemaRegistryClient.getAllVersions(topic + "-value")) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0))) + .isThrownBy(() -> schemaRegistryClient.getSchemaMetadata(topic + "-value", versions.get(0))) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) + .isThrownBy(() -> schemaRegistryClient.getLatestSchemaMetadata(topic + "-value")) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } + @Test + void shouldReturnValueSchemaVersion() throws IOException, RestClientException { + final Schema valueSchema = createSchema("value_schema"); + this.schemaRegistry.registerValueSchema("test-topic", valueSchema); + + final Integer version = + this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", valueSchema); + assertThat(version).isEqualTo(1); + } + + @Test + void shouldReturnKeySchemaVersion() throws IOException, RestClientException { + final Schema valueSchema = createSchema("value_schema"); + this.schemaRegistry.registerKeySchema("test-topic", valueSchema); + + final Integer version = this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", valueSchema); + assertThat(version).isEqualTo(1); + } + + @Test + void shouldReturnValueSchemaId() throws IOException, RestClientException { + final Schema valueSchema = createSchema("value_schema"); + this.schemaRegistry.registerValueSchema("test-topic", valueSchema); + + final Integer id = this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", valueSchema); + assertThat(id).isEqualTo(1); + } + + @Test + void shouldReturnKeySchemaId() throws IOException, RestClientException { + final Schema valueSchema = createSchema("value_schema"); + this.schemaRegistry.registerKeySchema("test-topic", valueSchema); + + final Integer id = this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-key", valueSchema); + assertThat(id).isEqualTo(1); + } + + @Test + void shouldNotReturnVersionForNonExistingSchema() { + final Schema test = createSchema("test"); + final Schema other = createSchema("other"); + this.schemaRegistry.registerValueSchema("test-topic", other); + assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", test)) + .isInstanceOfSatisfying(RestClientException.class, + e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)) + .hasMessage("Schema not found; error code: 40403"); + } + + @Test + void shouldNotReturnIdForNonExistingSchema() { + final Schema test = createSchema("test"); + final Schema other = createSchema("other"); + this.schemaRegistry.registerValueSchema("test-topic", other); + assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", test)) + .isInstanceOfSatisfying(RestClientException.class, + e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)) + .hasMessage("Schema not found; error code: 40403"); + } + + @Test + void shouldNotReturnVersionForNonExistingSubject() { + final Schema test = createSchema("test"); + assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", test)) + .isInstanceOfSatisfying(RestClientException.class, + e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)) + .hasMessage("Subject not found; error code: 40401"); + } + + @Test + void shouldNotReturnIdForNonExistingSubject() { + final Schema test = createSchema("test"); + assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", test)) + .isInstanceOfSatisfying(RestClientException.class, + e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)) + .hasMessage("Subject not found; error code: 40401"); + } + private static Schema createSchema(final String name) { return Schema.createRecord(name, "no doc", "", false, Collections.emptyList()); }