Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoint for specific schema version #37

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*/
package com.bakdata.schemaregistrymock;

import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

import com.github.tomakehurst.wiremock.WireMockServer;
Expand All @@ -36,12 +37,14 @@
import com.github.tomakehurst.wiremock.http.ResponseDefinition;
import com.github.tomakehurst.wiremock.matching.UrlPathPattern;
import com.github.tomakehurst.wiremock.matching.UrlPattern;
import com.google.common.base.CharMatcher;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
Expand All @@ -54,11 +57,14 @@

/**
* <p>The schema registry mock implements a few basic HTTP endpoints that are used by the Avro serdes.</p>
* In particular,
* In particular, you can
* <ul>
* <li>you can register a schema</li>
* <li>retrieve a schema by id.</li>
* <li>register a schema</li>
* <li>retrieve a schema by id</li>
* <li>list and get schema versions of a subject</li>
* <li>list all subjects</li>
* <li>delete a subject</li>
* <li>retrieve version and id of a schema</li>
* </ul>
*
* <p>If you use the TestTopology of the fluent Kafka Streams test, you don't have to interact with this class at
Expand Down Expand Up @@ -96,17 +102,19 @@ public class SchemaRegistryMock {
private static final String ALL_SUBJECT_PATTERN = "/subjects";
private static final String SCHEMA_PATH_PATTERN = "/subjects/[^/]+/versions";
private static final String SCHEMA_BY_ID_PATTERN = "/schemas/ids/";
private static final String SPECIFIC_SCHEMA_PATTERN = "/subjects/[^/]+";
private static final int IDENTITY_MAP_CAPACITY = 1000;

private final ListVersionsHandler listVersionsHandler = new ListVersionsHandler();
private final GetVersionHandler getVersionHandler = new GetVersionHandler();
private final AutoRegistrationHandler autoRegistrationHandler = new AutoRegistrationHandler();
private final DeleteSubjectHandler deleteSubjectHandler = new DeleteSubjectHandler();
private final AllSubjectsHandler allSubjectsHandler = new AllSubjectsHandler();
private final SchemaHandler schemaHandler = new SchemaHandler();
private final WireMockServer mockSchemaRegistry = new WireMockServer(
WireMockConfiguration.wireMockConfig().dynamicPort()
.extensions(this.autoRegistrationHandler, this.listVersionsHandler, this.getVersionHandler,
this.deleteSubjectHandler, this.allSubjectsHandler));
this.deleteSubjectHandler, this.allSubjectsHandler, this.schemaHandler));
private final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();

private static UrlPattern getSchemaPattern(final Integer id) {
Expand Down Expand Up @@ -139,6 +147,8 @@ public void start() {
.willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND)));
this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN))
.willReturn(WireMock.aResponse().withTransformers(this.allSubjectsHandler.getName())));
this.mockSchemaRegistry.stubFor(WireMock.post(WireMock.urlPathMatching(SPECIFIC_SCHEMA_PATTERN))
.willReturn(WireMock.aResponse().withTransformers(this.schemaHandler.getName())));
}

public void stop() {
Expand Down Expand Up @@ -234,8 +244,8 @@ private Collection<String> listAllSubjects() {
}

private abstract static class SubjectsHandler extends ResponseDefinitionTransformer {
// Expected url pattern /subjects(/.*-value/versions)
protected final Splitter urlSplitter = Splitter.on('/').omitEmptyStrings();
// Expected url pattern /subjects(/.*-value/(versions|?param))
protected final Splitter urlSplitter = Splitter.on(CharMatcher.anyOf("/?")).omitEmptyStrings();

@Override
public boolean applyGlobally() {
Expand Down Expand Up @@ -336,4 +346,56 @@ public String getName() {
return AllSubjectsHandler.class.getSimpleName();
}
}

/**
* The SchemaHandler returns version and id for a given schema.
*
* Note: It returns "Schema not found, Error code 40403" for a deleted schema even if the request parameter
* 'deleted' is set to true. The MockSchemaRegistryClient does not save the version of a deleted schema.
*/
private class SchemaHandler extends SubjectsHandler {
@Override
public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition,
final FileSource files, final Parameters parameters) {
final String requestSubject = this.getSubject(request);
// Check if requestSubject exists. This is required because the mock always throws an exception
// with 'Schema not found'
final boolean subjectExists = SchemaRegistryMock.this.listAllSubjects().stream()
.anyMatch(subject -> subject.equals(requestSubject));

if (!subjectExists) {
final ErrorMessage error = new ErrorMessage(40401, "Subject not found");
return ResponseDefinitionBuilder.jsonResponse(error, HTTP_NOT_FOUND);
}

final Schema schema;
try {
schema = new Schema.Parser()
.parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema());
} catch (final IOException e) {
final ErrorMessage error =
new ErrorMessage(HTTP_INTERNAL_ERROR, "Error while looking up schema under subject topic");
return ResponseDefinitionBuilder.jsonResponse(error, HTTP_INTERNAL_ERROR);
}

try {
final int schemaId = SchemaRegistryMock.this.schemaRegistryClient.getId(requestSubject, schema);
final int schemaVersion =
SchemaRegistryMock.this.schemaRegistryClient.getVersion(requestSubject, schema);
return ResponseDefinitionBuilder
.jsonResponse(new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(
requestSubject, schemaVersion, schemaId, schema.toString()));
} catch (final RestClientException | IOException e) {
final ErrorMessage error = new ErrorMessage(40403, "Schema not found");
return ResponseDefinitionBuilder.jsonResponse(error, HTTP_NOT_FOUND);
}
}

@Override
public String getName() {
return SchemaHandler.class.getSimpleName();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
Expand Down Expand Up @@ -97,7 +98,8 @@ void shouldHaveSchemaVersions() throws IOException, RestClientException {
final List<Integer> versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value");
assertThat(versions.size()).isOne();

final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0));
final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient()
.getSchemaMetadata(topic + "-value", versions.get(0));
assertThat(metadata.getId()).isEqualTo(id);
final String schemaString = metadata.getSchema();
final Schema retrievedSchema = new Schema.Parser().parse(schemaString);
Expand Down Expand Up @@ -128,7 +130,8 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException {
final List<Integer> versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value");
assertThat(versions.size()).isEqualTo(2);

final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value");
final SchemaMetadata metadata =
this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value");
final int metadataId = metadata.getId();
assertThat(metadataId).isNotEqualTo(id1);
assertThat(metadataId).isEqualTo(id2);
Expand All @@ -140,7 +143,8 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException {
@Test
void shouldNotHaveLatestSchemaVersionForUnknownSubject() {
assertThatExceptionOfType(RestClientException.class)
.isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist"))
.isThrownBy(
() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist"))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
}

Expand Down Expand Up @@ -211,32 +215,109 @@ void shouldNotDeleteUnknownSubject() {
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
}


@Test
void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClientException {
final Schema valueSchema = createSchema("value_schema");
final String topic = "test-topic";
final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema);

final List<Integer> versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value");
final SchemaRegistryClient schemaRegistryClient = this.schemaRegistry.getSchemaRegistryClient();
final List<Integer> versions = schemaRegistryClient.getAllVersions(topic + "-value");
assertThat(versions.size()).isOne();

final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0));
final SchemaMetadata metadata = schemaRegistryClient.getSchemaMetadata(topic + "-value", versions.get(0));
assertThat(metadata.getId()).isEqualTo(id);
assertThat(this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"))
assertThat(schemaRegistryClient.getLatestSchemaMetadata(topic + "-value"))
.isNotNull();
this.schemaRegistry.deleteValueSchema(topic);
assertThatExceptionOfType(RestClientException.class)
.isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"))
.isThrownBy(() -> schemaRegistryClient.getAllVersions(topic + "-value"))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
assertThatExceptionOfType(RestClientException.class)
.isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)))
.isThrownBy(() -> schemaRegistryClient.getSchemaMetadata(topic + "-value", versions.get(0)))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
assertThatExceptionOfType(RestClientException.class)
.isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"))
.isThrownBy(() -> schemaRegistryClient.getLatestSchemaMetadata(topic + "-value"))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
}

@Test
void shouldReturnValueSchemaVersion() throws IOException, RestClientException {
final Schema valueSchema = createSchema("value_schema");
this.schemaRegistry.registerValueSchema("test-topic", valueSchema);

final Integer version =
this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", valueSchema);
assertThat(version).isEqualTo(1);
}

@Test
void shouldReturnKeySchemaVersion() throws IOException, RestClientException {
final Schema valueSchema = createSchema("value_schema");
this.schemaRegistry.registerKeySchema("test-topic", valueSchema);

final Integer version = this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", valueSchema);
assertThat(version).isEqualTo(1);
}

@Test
void shouldReturnValueSchemaId() throws IOException, RestClientException {
final Schema valueSchema = createSchema("value_schema");
this.schemaRegistry.registerValueSchema("test-topic", valueSchema);

final Integer id = this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", valueSchema);
assertThat(id).isEqualTo(1);
}

@Test
void shouldReturnKeySchemaId() throws IOException, RestClientException {
final Schema valueSchema = createSchema("value_schema");
this.schemaRegistry.registerKeySchema("test-topic", valueSchema);

final Integer id = this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-key", valueSchema);
assertThat(id).isEqualTo(1);
}

@Test
void shouldNotReturnVersionForNonExistingSchema() {
final Schema test = createSchema("test");
final Schema other = createSchema("other");
this.schemaRegistry.registerValueSchema("test-topic", other);
assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", test))
.isInstanceOfSatisfying(RestClientException.class,
e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND))
.hasMessage("Schema not found; error code: 40403");
}

@Test
void shouldNotReturnIdForNonExistingSchema() {
final Schema test = createSchema("test");
final Schema other = createSchema("other");
this.schemaRegistry.registerValueSchema("test-topic", other);
assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", test))
.isInstanceOfSatisfying(RestClientException.class,
e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND))
.hasMessage("Schema not found; error code: 40403");
}

@Test
void shouldNotReturnVersionForNonExistingSubject() {
final Schema test = createSchema("test");
assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", test))
.isInstanceOfSatisfying(RestClientException.class,
e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND))
.hasMessage("Subject not found; error code: 40401");
}

@Test
void shouldNotReturnIdForNonExistingSubject() {
final Schema test = createSchema("test");
assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", test))
.isInstanceOfSatisfying(RestClientException.class,
e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND))
.hasMessage("Subject not found; error code: 40401");
}

private static Schema createSchema(final String name) {
return Schema.createRecord(name, "no doc", "", false, Collections.emptyList());
}
Expand Down