Skip to content

Commit

Permalink
adds further test for source streams
Browse files Browse the repository at this point in the history
  • Loading branch information
grimmjo committed Oct 26, 2023
1 parent 8505a99 commit 70b7730
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 73 deletions.
61 changes: 61 additions & 0 deletions nats/src/main/java/io/micronaut/nats/connect/External.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.nats.connect;

/**
* External.
*
* @author Joachim Grimm
* @since 4.1.0
*/
public abstract class External {

private String api;
private String deliver;

/**
* build the external information from the given configuration.
*
* @return this
*/
public io.nats.client.api.External build() {
return io.nats.client.api.External.builder().api(api).deliver(deliver).build();
}

/**
* Api.
* @param api {@link String}
*/
public void setApi(String api) {
this.api = api;
}

/**
* Deliver.
* @param deliver {@link String}
*/
public void setDeliver(String deliver) {
this.deliver = deliver;
}

final String getApi() {
return api;
}

final String getDeliver() {
return deliver;
}
}
11 changes: 7 additions & 4 deletions nats/src/main/java/io/micronaut/nats/connect/Mirror.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* @author Joachim Grimm
* @since 4.1.0
*/
public abstract class Mirror<T extends SubjectTransformBase, E extends SourceBase.External> extends SourceBase<T, E> {
public abstract class Mirror<T extends SubjectTransformBase, E extends External> extends SourceBase<T, E> {

/**
* build the mirror object from the given configuration.
Expand All @@ -35,9 +35,12 @@ public io.nats.client.api.Mirror build() {
.name(name)
.filterSubject(filterSubject)
.startSeq(startSeq)
.startTime(startTime)
.subjectTransforms(subjectTransforms.stream().map(SubjectTransformBase::build)
.toList());
.startTime(startTime);
if (subjectTransforms != null) {
builder = builder
.subjectTransforms(subjectTransforms.stream().map(SubjectTransformBase::build)
.toList());
}
if (external != null) {
builder = builder.external(external.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,14 @@ public void setCertificatePath(String certificatePath) {
private SSLContext createTlsContext() throws IOException, GeneralSecurityException {
SSLContext ctx = SSLContext.getInstance(DEFAULT_SSL_PROTOCOL);

TrustManagerFactory factory = TrustManagerFactory.getInstance(Optional.ofNullable(trustStoreType).orElse("SunX509"));
TrustManagerFactory factory = TrustManagerFactory.getInstance(Optional.ofNullable(trustStoreType)
.orElse("SunX509"));
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
if (trustStorePath != null && !trustStorePath.isEmpty()) {
try (BufferedInputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(trustStorePath)))) {
ks.load(in, Optional.ofNullable(trustStorePassword).map(String::toCharArray).orElse(new char[0]));
ks.load(in, Optional.ofNullable(trustStorePassword)
.map(String::toCharArray)
.orElse(new char[0]));
}
} else {
ks.load(null);
Expand Down Expand Up @@ -454,7 +457,7 @@ public static class StreamConfiguration {

private Mirror mirror;

private List<Source> sources = new ArrayList<>();
private List<Source> sources;

private Republish republish;

Expand All @@ -481,9 +484,12 @@ public io.nats.client.api.StreamConfiguration.Builder getBuilder() {
*/
public io.nats.client.api.StreamConfiguration toStreamConfiguration() {
io.nats.client.api.StreamConfiguration.Builder streamBuilder = builder.name(name)
.subjects(subjects)
.sources(sources.stream().map(io.micronaut.nats.connect.Source::build).toList());

.subjects(subjects);
if (sources != null) {
streamBuilder = streamBuilder.sources(sources.stream()
.map(io.micronaut.nats.connect.Source::build)
.toList());
}
if (mirror != null) {
streamBuilder = streamBuilder.mirror(mirror.build());
}
Expand Down Expand Up @@ -558,6 +564,7 @@ public void setRepublish(Republish republish) {

/**
* Consumer Limits.
*
* @param consumerLimits {@link ConsumerLimits}
*/
public void setConsumerLimits(ConsumerLimits consumerLimits) {
Expand Down Expand Up @@ -611,7 +618,7 @@ public static class ConsumerLimits extends io.micronaut.nats.connect.ConsumerLim
* @since 4.1.0
*/
@ConfigurationProperties("mirror")
public static class Mirror extends io.micronaut.nats.connect.Mirror<SubjectTransformBase, Mirror.External> {
public static class Mirror extends io.micronaut.nats.connect.Mirror<Mirror.SubjectTransform, Mirror.External> {

/**
* Subject transformations.
Expand All @@ -621,7 +628,6 @@ public static class Mirror extends io.micronaut.nats.connect.Mirror<SubjectTrans
*/
@EachProperty(value = "subject-transforms", list = true)
public static class SubjectTransform extends SubjectTransformBase {

}

/**
Expand All @@ -631,7 +637,7 @@ public static class SubjectTransform extends SubjectTransformBase {
* @since 4.1.0
*/
@ConfigurationProperties("external")
public static class External extends SourceBase.External {
public static class External extends io.micronaut.nats.connect.External {

}
}
Expand All @@ -643,17 +649,16 @@ public static class External extends SourceBase.External {
* @since 4.1.0
*/
@EachProperty(value = "sources", list = true)
public static class Source extends io.micronaut.nats.connect.Source<SubjectTransform, Mirror.External> {
public static class Source extends io.micronaut.nats.connect.Source<Source.SubjectTransform, Source.External> {

/**
* Subject transformations.
*
* @author Joachim Grimm
* @since 4.1.0
*/
@EachProperty(value = "subject-transforms", list = true)
@EachProperty(value = "subjectTransforms", list = true)
public static class SubjectTransform extends SubjectTransformBase {

}

/**
Expand All @@ -663,7 +668,7 @@ public static class SubjectTransform extends SubjectTransformBase {
* @since 4.1.0
*/
@ConfigurationProperties("external")
public static class External extends SourceBase.External {
public static class External extends io.micronaut.nats.connect.External {

}
}
Expand All @@ -683,7 +688,7 @@ public static class KeyValueConfiguration {

private Mirror mirror;

private List<Source> sources = new ArrayList<>();
private List<Source> sources;

@ConfigurationBuilder(prefixes = "", excludes = {"addSources", "addSource", "name", "sources", "build", "placement", "republish", "mirror"})
private io.nats.client.api.KeyValueConfiguration.Builder builder = io.nats.client.api.KeyValueConfiguration.builder();
Expand Down Expand Up @@ -711,8 +716,13 @@ public io.nats.client.api.KeyValueConfiguration.Builder getBuilder() {
*/
public io.nats.client.api.KeyValueConfiguration toKeyValueConfiguration() {
io.nats.client.api.KeyValueConfiguration.Builder keyValueBuilder = builder
.name(name)
.sources(sources.stream().map(io.micronaut.nats.connect.Source::build).toList());
.name(name);
if (sources != null) {
keyValueBuilder = keyValueBuilder
.sources(sources.stream()
.map(io.micronaut.nats.connect.Source::build)
.toList());
}
if (mirror != null) {
keyValueBuilder = keyValueBuilder.mirror(mirror.build());
}
Expand Down Expand Up @@ -788,7 +798,7 @@ public static class Republish extends io.micronaut.nats.connect.Republish {
* @since 4.1.0
*/
@ConfigurationProperties("mirror")
public static class Mirror extends io.micronaut.nats.connect.Mirror<SubjectTransformBase, Mirror.External> {
public static class Mirror extends io.micronaut.nats.connect.Mirror<Mirror.SubjectTransform, Mirror.External> {

/**
* Subject transformations.
Expand All @@ -798,7 +808,6 @@ public static class Mirror extends io.micronaut.nats.connect.Mirror<SubjectTrans
*/
@EachProperty(value = "subject-transforms", list = true)
public static class SubjectTransform extends SubjectTransformBase {

}

/**
Expand All @@ -808,7 +817,7 @@ public static class SubjectTransform extends SubjectTransformBase {
* @since 4.1.0
*/
@ConfigurationProperties("external")
public static class External extends SourceBase.External {
public static class External extends io.micronaut.nats.connect.External {

}
}
Expand All @@ -830,7 +839,6 @@ public static class Source extends io.micronaut.nats.connect.Source<Source.Subje
*/
@EachProperty(value = "subject-transforms", list = true)
public static class SubjectTransform extends SubjectTransformBase {

}

/**
Expand All @@ -840,7 +848,7 @@ public static class SubjectTransform extends SubjectTransformBase {
* @since 4.1.0
*/
@ConfigurationProperties("external")
public static class External extends SourceBase.External {
public static class External extends io.micronaut.nats.connect.External {

}
}
Expand Down
12 changes: 8 additions & 4 deletions nats/src/main/java/io/micronaut/nats/connect/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* @author Joachim Grimm
* @since 4.1.0
*/
public abstract class Source<T extends SubjectTransformBase, E extends SourceBase.External> extends SourceBase<T, E> {
public abstract class Source<T extends SubjectTransformBase, E extends External> extends SourceBase<T, E> {

/**
* build the source object from the given configuration.
Expand All @@ -35,9 +35,13 @@ public io.nats.client.api.Source build() {
.name(name)
.filterSubject(filterSubject)
.startSeq(startSeq)
.startTime(startTime)
.subjectTransforms(subjectTransforms.stream().map(SubjectTransformBase::build)
.toList());
.startTime(startTime);
if (subjectTransforms != null) {
builder = builder
.subjectTransforms(subjectTransforms.stream().map(SubjectTransformBase::build)
.toList());
}

if (external != null) {
builder = builder.external(external.build());
}
Expand Down
51 changes: 7 additions & 44 deletions nats/src/main/java/io/micronaut/nats/connect/SourceBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,21 @@
*/
package io.micronaut.nats.connect;

import io.micronaut.context.annotation.EachProperty;
import io.micronaut.core.annotation.NonNull;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;

abstract class SourceBase<T extends SubjectTransformBase, E extends SourceBase.External> {
abstract class SourceBase<T extends SubjectTransformBase, E extends External> {

protected String name;
protected long startSeq = 0;
protected ZonedDateTime startTime;
protected String filterSubject;
protected E external;
protected List<T> subjectTransforms = new ArrayList<>();

protected List<T> subjectTransforms;

/**
* Name.
*
* @param name {@link String}
*/
public void setName(String name) {
Expand All @@ -42,6 +38,7 @@ public void setName(String name) {

/**
* Start sequence.
*
* @param startSeq long
*/
public void setStartSeq(long startSeq) {
Expand All @@ -50,6 +47,7 @@ public void setStartSeq(long startSeq) {

/**
* Start time.
*
* @param startTime {@link ZonedDateTime}
*/
public void setStartTime(ZonedDateTime startTime) {
Expand All @@ -58,6 +56,7 @@ public void setStartTime(ZonedDateTime startTime) {

/**
* Filter subject.
*
* @param filterSubject {@link String}
*/
public void setFilterSubject(String filterSubject) {
Expand All @@ -68,7 +67,7 @@ public void setFilterSubject(String filterSubject) {
* Subject transformations.
* @param subjectTransforms list
*/
public void setSubjectTransforms(@NonNull List<T> subjectTransforms) {
public void setSubjectTransforms(List<T> subjectTransforms) {
this.subjectTransforms = subjectTransforms;
}

Expand All @@ -80,40 +79,4 @@ public void setExternal(E external) {
this.external = external;
}

@EachProperty(value = "subject-transforms", list = true)
public static class SubjectTransform extends SubjectTransformBase {
}

/**
* External.
*
* @author Joachim Grimm
* @since 4.1.0
*/
public static class External {

private String api;
private String deliver;

public io.nats.client.api.External build() {
return io.nats.client.api.External.builder().api(api).deliver(deliver).build();
}

/**
* Api.
* @param api {@link String}
*/
public void setApi(String api) {
this.api = api;
}

/**
* Deliver.
* @param deliver {@link String}
*/
public void setDeliver(String deliver) {
this.deliver = deliver;
}
}

}
Loading

0 comments on commit 70b7730

Please sign in to comment.