diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java
index cfba321..5c14d10 100644
--- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java
+++ b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java
@@ -23,6 +23,7 @@
*/
package com.bakdata.schemaregistrymock;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import com.github.tomakehurst.wiremock.WireMockServer;
@@ -36,12 +37,14 @@
import com.github.tomakehurst.wiremock.http.ResponseDefinition;
import com.github.tomakehurst.wiremock.matching.UrlPathPattern;
import com.github.tomakehurst.wiremock.matching.UrlPattern;
+import com.google.common.base.CharMatcher;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
@@ -54,11 +57,14 @@
/**
*
The schema registry mock implements a few basic HTTP endpoints that are used by the Avro serdes.
- * In particular,
+ * In particular, you can
*
- * - you can register a schema
- * - retrieve a schema by id.
+ * - register a schema
+ * - retrieve a schema by id
* - list and get schema versions of a subject
+ * - list all subjects
+ * - delete a subject
+ * - retrieve version and id of a schema
*
*
* If you use the TestTopology of the fluent Kafka Streams test, you don't have to interact with this class at
@@ -96,6 +102,7 @@ public class SchemaRegistryMock {
private static final String ALL_SUBJECT_PATTERN = "/subjects";
private static final String SCHEMA_PATH_PATTERN = "/subjects/[^/]+/versions";
private static final String SCHEMA_BY_ID_PATTERN = "/schemas/ids/";
+ private static final String SPECIFIC_SCHEMA_PATTERN = "/subjects/[^/]+";
private static final int IDENTITY_MAP_CAPACITY = 1000;
private final ListVersionsHandler listVersionsHandler = new ListVersionsHandler();
@@ -103,10 +110,11 @@ public class SchemaRegistryMock {
private final AutoRegistrationHandler autoRegistrationHandler = new AutoRegistrationHandler();
private final DeleteSubjectHandler deleteSubjectHandler = new DeleteSubjectHandler();
private final AllSubjectsHandler allSubjectsHandler = new AllSubjectsHandler();
+ private final SchemaHandler schemaHandler = new SchemaHandler();
private final WireMockServer mockSchemaRegistry = new WireMockServer(
WireMockConfiguration.wireMockConfig().dynamicPort()
.extensions(this.autoRegistrationHandler, this.listVersionsHandler, this.getVersionHandler,
- this.deleteSubjectHandler, this.allSubjectsHandler));
+ this.deleteSubjectHandler, this.allSubjectsHandler, this.schemaHandler));
private final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
private static UrlPattern getSchemaPattern(final Integer id) {
@@ -139,6 +147,8 @@ public void start() {
.willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND)));
this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN))
.willReturn(WireMock.aResponse().withTransformers(this.allSubjectsHandler.getName())));
+ this.mockSchemaRegistry.stubFor(WireMock.post(WireMock.urlPathMatching(SPECIFIC_SCHEMA_PATTERN))
+ .willReturn(WireMock.aResponse().withTransformers(this.schemaHandler.getName())));
}
public void stop() {
@@ -234,8 +244,8 @@ private Collection listAllSubjects() {
}
private abstract static class SubjectsHandler extends ResponseDefinitionTransformer {
- // Expected url pattern /subjects(/.*-value/versions)
- protected final Splitter urlSplitter = Splitter.on('/').omitEmptyStrings();
+ // Expected url pattern /subjects(/.*-value/(versions|?param))
+ protected final Splitter urlSplitter = Splitter.on(CharMatcher.anyOf("/?")).omitEmptyStrings();
@Override
public boolean applyGlobally() {
@@ -336,4 +346,56 @@ public String getName() {
return AllSubjectsHandler.class.getSimpleName();
}
}
+
+ /**
+ * The SchemaHandler returns version and id for a given schema.
+ *
+ * Note: It returns "Schema not found, Error code 40403" for a deleted schema even if the request parameter
+ * 'deleted' is set to true. The MockSchemaRegistryClient does not save the version of a deleted schema.
+ */
+ private class SchemaHandler extends SubjectsHandler {
+ @Override
+ public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition,
+ final FileSource files, final Parameters parameters) {
+ final String requestSubject = this.getSubject(request);
+ // Check if requestSubject exists. This is required because the mock always throws an exception
+ // with 'Schema not found'
+ final boolean subjectExists = SchemaRegistryMock.this.listAllSubjects().stream()
+ .anyMatch(subject -> subject.equals(requestSubject));
+
+ if (!subjectExists) {
+ final ErrorMessage error = new ErrorMessage(40401, "Subject not found");
+ return ResponseDefinitionBuilder.jsonResponse(error, HTTP_NOT_FOUND);
+ }
+
+ final Schema schema;
+ try {
+ schema = new Schema.Parser()
+ .parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema());
+ } catch (final IOException e) {
+ final ErrorMessage error =
+ new ErrorMessage(HTTP_INTERNAL_ERROR, "Error while looking up schema under subject topic");
+ return ResponseDefinitionBuilder.jsonResponse(error, HTTP_INTERNAL_ERROR);
+ }
+
+ try {
+ final int schemaId = SchemaRegistryMock.this.schemaRegistryClient.getId(requestSubject, schema);
+ final int schemaVersion =
+ SchemaRegistryMock.this.schemaRegistryClient.getVersion(requestSubject, schema);
+ return ResponseDefinitionBuilder
+ .jsonResponse(new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(
+ requestSubject, schemaVersion, schemaId, schema.toString()));
+ } catch (final RestClientException | IOException e) {
+ final ErrorMessage error = new ErrorMessage(40403, "Schema not found");
+ return ResponseDefinitionBuilder.jsonResponse(error, HTTP_NOT_FOUND);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return SchemaHandler.class.getSimpleName();
+ }
+
+ }
+
}
diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java
index f2a1c9f..b98cebe 100644
--- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java
+++ b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java
@@ -26,6 +26,7 @@
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
@@ -97,7 +98,8 @@ void shouldHaveSchemaVersions() throws IOException, RestClientException {
final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value");
assertThat(versions.size()).isOne();
- final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0));
+ final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient()
+ .getSchemaMetadata(topic + "-value", versions.get(0));
assertThat(metadata.getId()).isEqualTo(id);
final String schemaString = metadata.getSchema();
final Schema retrievedSchema = new Schema.Parser().parse(schemaString);
@@ -128,7 +130,8 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException {
final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value");
assertThat(versions.size()).isEqualTo(2);
- final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value");
+ final SchemaMetadata metadata =
+ this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value");
final int metadataId = metadata.getId();
assertThat(metadataId).isNotEqualTo(id1);
assertThat(metadataId).isEqualTo(id2);
@@ -140,7 +143,8 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException {
@Test
void shouldNotHaveLatestSchemaVersionForUnknownSubject() {
assertThatExceptionOfType(RestClientException.class)
- .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist"))
+ .isThrownBy(
+ () -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist"))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
}
@@ -211,32 +215,109 @@ void shouldNotDeleteUnknownSubject() {
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
}
-
@Test
void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClientException {
final Schema valueSchema = createSchema("value_schema");
final String topic = "test-topic";
final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema);
- final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value");
+ final SchemaRegistryClient schemaRegistryClient = this.schemaRegistry.getSchemaRegistryClient();
+ final List versions = schemaRegistryClient.getAllVersions(topic + "-value");
assertThat(versions.size()).isOne();
- final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0));
+ final SchemaMetadata metadata = schemaRegistryClient.getSchemaMetadata(topic + "-value", versions.get(0));
assertThat(metadata.getId()).isEqualTo(id);
- assertThat(this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"))
+ assertThat(schemaRegistryClient.getLatestSchemaMetadata(topic + "-value"))
.isNotNull();
this.schemaRegistry.deleteValueSchema(topic);
assertThatExceptionOfType(RestClientException.class)
- .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"))
+ .isThrownBy(() -> schemaRegistryClient.getAllVersions(topic + "-value"))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
assertThatExceptionOfType(RestClientException.class)
- .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)))
+ .isThrownBy(() -> schemaRegistryClient.getSchemaMetadata(topic + "-value", versions.get(0)))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
assertThatExceptionOfType(RestClientException.class)
- .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"))
+ .isThrownBy(() -> schemaRegistryClient.getLatestSchemaMetadata(topic + "-value"))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
}
+ @Test
+ void shouldReturnValueSchemaVersion() throws IOException, RestClientException {
+ final Schema valueSchema = createSchema("value_schema");
+ this.schemaRegistry.registerValueSchema("test-topic", valueSchema);
+
+ final Integer version =
+ this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", valueSchema);
+ assertThat(version).isEqualTo(1);
+ }
+
+ @Test
+ void shouldReturnKeySchemaVersion() throws IOException, RestClientException {
+ final Schema valueSchema = createSchema("value_schema");
+ this.schemaRegistry.registerKeySchema("test-topic", valueSchema);
+
+ final Integer version = this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", valueSchema);
+ assertThat(version).isEqualTo(1);
+ }
+
+ @Test
+ void shouldReturnValueSchemaId() throws IOException, RestClientException {
+ final Schema valueSchema = createSchema("value_schema");
+ this.schemaRegistry.registerValueSchema("test-topic", valueSchema);
+
+ final Integer id = this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", valueSchema);
+ assertThat(id).isEqualTo(1);
+ }
+
+ @Test
+ void shouldReturnKeySchemaId() throws IOException, RestClientException {
+ final Schema valueSchema = createSchema("value_schema");
+ this.schemaRegistry.registerKeySchema("test-topic", valueSchema);
+
+ final Integer id = this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-key", valueSchema);
+ assertThat(id).isEqualTo(1);
+ }
+
+ @Test
+ void shouldNotReturnVersionForNonExistingSchema() {
+ final Schema test = createSchema("test");
+ final Schema other = createSchema("other");
+ this.schemaRegistry.registerValueSchema("test-topic", other);
+ assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", test))
+ .isInstanceOfSatisfying(RestClientException.class,
+ e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND))
+ .hasMessage("Schema not found; error code: 40403");
+ }
+
+ @Test
+ void shouldNotReturnIdForNonExistingSchema() {
+ final Schema test = createSchema("test");
+ final Schema other = createSchema("other");
+ this.schemaRegistry.registerValueSchema("test-topic", other);
+ assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", test))
+ .isInstanceOfSatisfying(RestClientException.class,
+ e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND))
+ .hasMessage("Schema not found; error code: 40403");
+ }
+
+ @Test
+ void shouldNotReturnVersionForNonExistingSubject() {
+ final Schema test = createSchema("test");
+ assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", test))
+ .isInstanceOfSatisfying(RestClientException.class,
+ e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND))
+ .hasMessage("Subject not found; error code: 40401");
+ }
+
+ @Test
+ void shouldNotReturnIdForNonExistingSubject() {
+ final Schema test = createSchema("test");
+ assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", test))
+ .isInstanceOfSatisfying(RestClientException.class,
+ e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND))
+ .hasMessage("Subject not found; error code: 40401");
+ }
+
private static Schema createSchema(final String name) {
return Schema.createRecord(name, "no doc", "", false, Collections.emptyList());
}