From 22a7dbb554ba1bfdb9dc1bc0a89c3a3df9f03b1c Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Fri, 20 Sep 2024 22:12:00 -0500 Subject: [PATCH 01/15] client boilerplate --- .../consys/client/ConSysApiClient.java | 29 +++++-- .../consys/client/ConSysApiClientConfig.java | 45 ++++++++++ .../client/ConSysApiClientDescriptor.java | 37 ++++++++ .../consys/client/ConSysApiClientModule.java | 85 +++++++++++++++++++ .../deployment/DeploymentBindingHtml.java | 15 ++-- .../org.sensorhub.api.module.IModuleProvider | 3 +- 6 files changed, 201 insertions(+), 13 deletions(-) create mode 100644 sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java create mode 100644 sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientDescriptor.java create mode 100644 sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index e6ac69869..935a28928 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -46,6 +46,7 @@ import org.sensorhub.api.system.ISystemWithDesc; import org.sensorhub.impl.service.consys.ResourceParseException; import org.sensorhub.impl.service.consys.obs.DataStreamBindingJson; +import org.sensorhub.impl.service.consys.obs.ObsBindingOmJson; import org.sensorhub.impl.service.consys.resource.RequestContext; import org.sensorhub.impl.service.consys.resource.ResourceFormat; import org.sensorhub.impl.service.consys.resource.ResourceLink; @@ -205,7 +206,7 @@ public CompletableFuture addDataStream(String systemId, IDataStreamInfo var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - var binding = new DataStreamBindingJson(ctx, null, false, Collections.emptyMap()); + var binding = new DataStreamBindingJson(ctx, null, null, false, Collections.emptyMap()); binding.serializeCreate(datastream); return sendPostRequest( @@ -233,7 +234,7 @@ public CompletableFuture> addDataStreams(String systemId, Collection var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - var binding = new DataStreamBindingJson(ctx, null, false, Collections.emptyMap()) { + var binding = new DataStreamBindingJson(ctx, null, null, false, Collections.emptyMap()) { protected void startJsonCollection(JsonWriter writer) throws IOException { writer.beginArray(); @@ -274,7 +275,7 @@ public CompletableFuture addControlStream(String systemId, ICommandStrea var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - var binding = new CommandStreamBindingJson(ctx, null, false); + var binding = new CommandStreamBindingJson(ctx, null, null, false); binding.serializeCreate(cmdstream); return sendPostRequest( @@ -302,7 +303,7 @@ public CompletableFuture> addControlStreams(String systemId, Collect var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - var binding = new CommandStreamBindingJson(ctx, null, false) { + var binding = new CommandStreamBindingJson(ctx, null, null, false) { protected void startJsonCollection(JsonWriter writer) throws IOException { writer.beginArray(); @@ -336,9 +337,25 @@ protected void endJsonCollection(JsonWriter writer, Collection lin /* Observations */ /*--------------*/ - public CompletableFuture pushObs(String datastreamId, IObsData cmd) + public CompletableFuture pushObs(String datastreamId, IObsData obs) { - return null; + try + { + var buffer = new InMemoryBufferStreamHandler(); + var ctx = new RequestContext(buffer); + + var binding = new ObsBindingOmJson(ctx, null, false, null); + binding.serializeCreate(obs); + + return sendPostRequest( + endpoint.resolve(DATASTREAMS_COLLECTION + "/" + datastreamId), + ResourceFormat.JSON, + buffer); + } + catch (IOException e) + { + throw new IllegalStateException("Error initializing binding", e); + } } diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java new file mode 100644 index 000000000..86cdc091d --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java @@ -0,0 +1,45 @@ +package org.sensorhub.impl.service.consys.client; + +import org.sensorhub.api.client.ClientConfig; +import org.sensorhub.api.config.DisplayInfo; +import org.sensorhub.impl.comm.HTTPConfig; +import org.sensorhub.impl.comm.RobustIPConnectionConfig; +import org.sensorhub.impl.datastore.view.ObsSystemDatabaseViewConfig; + +public class ConSysApiClientConfig extends ClientConfig { + + @DisplayInfo(desc="Filtered view to select systems/datastreams to register with Connected Systems") + @DisplayInfo.Required + public ObsSystemDatabaseViewConfig dataSourceSelector; + + + @DisplayInfo(label="Connected Systems Endpoint", desc="Connected Systems endpoint where the requests are sent") + public HTTPConfig conSys = new HTTPConfig(); + + + @DisplayInfo(label="Connection Options") + public ConSysConnectionConfig connection = new ConSysConnectionConfig(); + + + public static class ConSysConnectionConfig extends RobustIPConnectionConfig + { + @DisplayInfo(desc="Enable to use a persistent HTTP connection for InsertResult") + public boolean usePersistentConnection; + + + @DisplayInfo(desc="Maximum number of records in upload queue (used to compensate for variable bandwidth)") + public int maxQueueSize = 10; + + + @DisplayInfo(desc="Maximum number of stream errors before we try to reconnect to remote server") + public int maxConnectErrors = 10; + } + + + public ConSysApiClientConfig() + { + this.moduleClass = ConSysApiClientModule.class.getCanonicalName(); + this.conSys.resourcePath = "/sensorhub/api"; + } + +} diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientDescriptor.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientDescriptor.java new file mode 100644 index 000000000..14397d48b --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientDescriptor.java @@ -0,0 +1,37 @@ +package org.sensorhub.impl.service.consys.client; + +import org.sensorhub.api.module.IModule; +import org.sensorhub.api.module.IModuleProvider; +import org.sensorhub.api.module.ModuleConfig; +import org.sensorhub.impl.module.JarModuleProvider; + +public class ConSysApiClientDescriptor extends JarModuleProvider implements IModuleProvider { + + @Override + public String getModuleName() + { + return "Connected Systems Client"; + } + + + @Override + public String getModuleDescription() + { + return "Connected Systems client for pushing observations to a remote SensorHub"; + } + + + @Override + public Class> getModuleClass() + { + return ConSysApiClientModule.class; + } + + + @Override + public Class getModuleConfigClass() + { + return ConSysApiClientConfig.class; + } + +} diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java new file mode 100644 index 000000000..4412b7da2 --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -0,0 +1,85 @@ +package org.sensorhub.impl.service.consys.client; + +import org.sensorhub.api.client.IClientModule; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.api.database.IObsSystemDatabase; +import org.sensorhub.impl.module.AbstractModule; +import org.sensorhub.impl.module.RobustConnection; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; + +public class ConSysApiClientModule extends AbstractModule implements IClientModule { + + RobustConnection connection; + IObsSystemDatabase dataBaseView; + String apiEndpointUrl; + ConSysApiClient client; + + public ConSysApiClientModule() + { + this.startAsync = true; + } + + protected void checkConfiguration() throws SensorHubException + { + // TODO check config + } + + @Override + protected void doInit() throws SensorHubException + { + checkConfiguration(); + + this.dataBaseView = config.dataSourceSelector.getFilteredView(getParentHub()); + + this.client = new ConSysApiClient.ConSysApiClientBuilder(apiEndpointUrl) + .simpleAuth(config.conSys.user, config.conSys.password.toCharArray()) + .build(); + + // TODO: Other initialization + } + + @Override + protected void doStart() throws SensorHubException + { + System.out.println(client); + dataBaseView.getSystemDescStore().select(dataBaseView.getSystemDescStore().selectAllFilter()) + .forEach((system) -> { + System.out.println(system.getName()); + var newSys = client.addSystem(system); + System.out.println(newSys); + }); + + // TODO: Ensure connection can be made + + // TODO: Subscribe to system registry + + // TODO: Register systems/subsystems to destination SensorHub + // TODO: Register datastreams to destination + // TODO: Push observations from datastreams + } + + @Override + public boolean isConnected() + { + return false; + } + + @Override + public void setConfiguration(ConSysApiClientConfig config) + { + super.setConfiguration(config); + + String scheme = "http"; + if (config.conSys.enableTLS) + scheme += "s"; + apiEndpointUrl = scheme + "://" + config.conSys.remoteHost + ":" + config.conSys.remotePort; + if (config.conSys.resourcePath != null) + { + if (config.conSys.resourcePath.charAt(0) != '/') + apiEndpointUrl += '/'; + apiEndpointUrl += config.conSys.resourcePath; + } + } +} diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/deployment/DeploymentBindingHtml.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/deployment/DeploymentBindingHtml.java index 4751e8022..9ae87b574 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/deployment/DeploymentBindingHtml.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/deployment/DeploymentBindingHtml.java @@ -17,6 +17,8 @@ import static j2html.TagCreator.div; import static j2html.TagCreator.iff; import java.io.IOException; + +import j2html.tags.DomContent; import org.sensorhub.api.common.IdEncoders; import org.sensorhub.api.database.IObsSystemDatabase; import org.sensorhub.api.datastore.feature.FeatureKey; @@ -75,12 +77,13 @@ protected String getResourceUrl(FeatureKey key) protected DivTag getLinks(String resourceUrl, FeatureKey key, IDeploymentWithDesc f) { var deplId = idEncoders.getDeploymentIdEncoder().encodeID(key.getInternalID()); - - return div( - iff(assocs.getParentLink(deplId, ResourceFormat.HTML), - link -> getLinkButton("Parent Deployment", link.getHref())), - iff(assocs.getSubdeploymentsLink(deplId, ResourceFormat.HTML), - link -> getLinkButton("Subdeployments", link.getHref())) + + DivTag div = div( + (DomContent) iff(assocs.getParentLink(deplId, ResourceFormat.HTML), + link -> getLinkButton("Parent Deployment", link.getHref())), + (DomContent) iff(assocs.getSubdeploymentsLink(deplId, ResourceFormat.HTML), + link -> getLinkButton("Subdeployments", link.getHref())) ); + return div; } } diff --git a/sensorhub-service-consys/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider b/sensorhub-service-consys/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider index 18a88ed7f..8e42cbcc6 100644 --- a/sensorhub-service-consys/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider +++ b/sensorhub-service-consys/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider @@ -1 +1,2 @@ -org.sensorhub.impl.service.consys.ConSysApiServiceDescriptor \ No newline at end of file +org.sensorhub.impl.service.consys.ConSysApiServiceDescriptor +org.sensorhub.impl.service.consys.client.ConSysApiClientDescriptor \ No newline at end of file From 899678e65777373a06435b84c822f183bd688c46 Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Mon, 23 Sep 2024 20:32:14 -0500 Subject: [PATCH 02/15] Add push observations --- .../consys/client/ConSysApiClient.java | 8 +- .../consys/client/ConSysApiClientModule.java | 144 +++++++++++++++--- 2 files changed, 127 insertions(+), 25 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index 935a28928..00619c364 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -43,6 +43,7 @@ import org.sensorhub.api.command.ICommandStreamInfo; import org.sensorhub.api.data.IDataStreamInfo; import org.sensorhub.api.data.IObsData; +import org.sensorhub.api.datastore.obs.IObsStore; import org.sensorhub.api.system.ISystemWithDesc; import org.sensorhub.impl.service.consys.ResourceParseException; import org.sensorhub.impl.service.consys.obs.DataStreamBindingJson; @@ -68,6 +69,7 @@ public class ConSysApiClient static final String SYSTEMS_COLLECTION = "systems"; static final String DATASTREAMS_COLLECTION = "datastreams"; static final String CONTROLS_COLLECTION = "controls"; + static final String OBSERVATIONS_COLLECTION = "observations"; static final String SF_COLLECTION = "fois"; HttpClient http; @@ -337,18 +339,18 @@ protected void endJsonCollection(JsonWriter writer, Collection lin /* Observations */ /*--------------*/ - public CompletableFuture pushObs(String datastreamId, IObsData obs) + public CompletableFuture pushObs(String datastreamId, IObsData obs, IObsStore obsStore) { try { var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - var binding = new ObsBindingOmJson(ctx, null, false, null); + var binding = new ObsBindingOmJson(ctx, null, false, obsStore); binding.serializeCreate(obs); return sendPostRequest( - endpoint.resolve(DATASTREAMS_COLLECTION + "/" + datastreamId), + endpoint.resolve(DATASTREAMS_COLLECTION + "/" + datastreamId + "/" + OBSERVATIONS_COLLECTION), ResourceFormat.JSON, buffer); } diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java index 4412b7da2..8f2911103 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -1,13 +1,30 @@ package org.sensorhub.impl.service.consys.client; import org.sensorhub.api.client.IClientModule; +import org.sensorhub.api.common.BigId; import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.api.data.DataStreamInfo; +import org.sensorhub.api.data.IDataStreamInfo; +import org.sensorhub.api.data.ObsEvent; import org.sensorhub.api.database.IObsSystemDatabase; +import org.sensorhub.api.datastore.obs.DataStreamFilter; +import org.sensorhub.api.datastore.obs.ObsFilter; +import org.sensorhub.api.datastore.system.SystemFilter; +import org.sensorhub.api.system.ISystemWithDesc; import org.sensorhub.impl.module.AbstractModule; import org.sensorhub.impl.module.RobustConnection; - +import org.sensorhub.impl.service.consys.resource.ResourceFormat; +import org.vast.cdm.common.DataStreamWriter; +import org.vast.swe.SWEData; + +import java.net.HttpURLConnection; +import java.net.URI; +import java.util.Map; +import java.util.NavigableMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Flow; public class ConSysApiClientModule extends AbstractModule implements IClientModule { @@ -15,10 +32,50 @@ public class ConSysApiClientModule extends AbstractModule IObsSystemDatabase dataBaseView; String apiEndpointUrl; ConSysApiClient client; + Map registeredSystems; + NavigableMap dataStreams; + + class SystemRegInfo + { + private Flow.Subscription subscription; + private ISystemWithDesc system; + } + + class StreamInfo + { + private String dataStreamID; + private String topicID; + private String sysUID; + private String outputName; + private Flow.Subscription subscription; + private HttpURLConnection connection; + private DataStreamWriter persistentWriter; + private volatile boolean connecting = false; + private volatile boolean stopping = false; + } public ConSysApiClientModule() { - this.startAsync = true; +// this.startAsync = true; + this.registeredSystems = new ConcurrentHashMap<>(); + this.dataStreams = new ConcurrentSkipListMap<>(); + } + + @Override + public void setConfiguration(ConSysApiClientConfig config) + { + super.setConfiguration(config); + + String scheme = "http"; + if (config.conSys.enableTLS) + scheme += "s"; + apiEndpointUrl = scheme + "://" + config.conSys.remoteHost + ":" + config.conSys.remotePort; + if (config.conSys.resourcePath != null) + { + if (config.conSys.resourcePath.charAt(0) != '/') + apiEndpointUrl += '/'; + apiEndpointUrl += config.conSys.resourcePath; + } } protected void checkConfiguration() throws SensorHubException @@ -33,7 +90,8 @@ protected void doInit() throws SensorHubException this.dataBaseView = config.dataSourceSelector.getFilteredView(getParentHub()); - this.client = new ConSysApiClient.ConSysApiClientBuilder(apiEndpointUrl) + this.client = ConSysApiClient. + newBuilder(apiEndpointUrl) .simpleAuth(config.conSys.user, config.conSys.password.toCharArray()) .build(); @@ -41,16 +99,49 @@ protected void doInit() throws SensorHubException } @Override - protected void doStart() throws SensorHubException - { - System.out.println(client); + protected void doStart() throws SensorHubException { + // Check if endpoint is available + try{ + client.sendGetRequest(URI.create(apiEndpointUrl), ResourceFormat.JSON, null); + } catch (Exception e) { + reportError("Unable to establish connection to Connected Systems endpoint", e); + } + dataBaseView.getSystemDescStore().select(dataBaseView.getSystemDescStore().selectAllFilter()) .forEach((system) -> { - System.out.println(system.getName()); - var newSys = client.addSystem(system); + String newSys = null; + try { + newSys = client.addSystem(system).get(); + String finalNewSys = newSys; + + dataBaseView.getDataStreamStore().select(new DataStreamFilter.Builder().withSystems(new SystemFilter.Builder().withUniqueIDs(system.getUniqueIdentifier()).build()).build()) + .forEach((datastream) -> { + try { + String newDs = client.addDataStream(finalNewSys, datastream).get(); + System.out.println("Added datastream " + datastream.getOutputName() + ". Now pushing observations..."); + if(dataBaseView.getObservationStore() == null) { + System.out.println("Observation store does not exist, continuing"); + return; + } + dataBaseView.getObservationStore().select( + new ObsFilter.Builder().withDataStreams( + new DataStreamFilter.Builder().withOutputNames(datastream.getOutputName()) + .build()) + .build()) + .forEach((obs) -> { + client.pushObs(newDs, obs, dataBaseView.getObservationStore()); + }); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } System.out.println(newSys); }); + // TODO: Check if system exists // TODO: Ensure connection can be made // TODO: Subscribe to system registry @@ -60,26 +151,35 @@ protected void doStart() throws SensorHubException // TODO: Push observations from datastreams } + protected void registerSystemDataStreams(SystemRegInfo system) + { + dataBaseView.getDataStreamStore().select( + new DataStreamFilter.Builder() + .withSystems(new SystemFilter.Builder() + .withUniqueIDs(system.system.getUniqueIdentifier()) + .build()) + .build()) + .forEach((dataStream) -> { +// var streamInfo = registerDataStream(dataStream); +// dataStreams.put("", streamInfo); + }); + } + + protected void registerDataStream(IDataStreamInfo dataStream) + { + + } + @Override public boolean isConnected() { return false; } - @Override - public void setConfiguration(ConSysApiClientConfig config) + protected void handleEvent(final ObsEvent e, StreamInfo streamInfo) { - super.setConfiguration(config); - - String scheme = "http"; - if (config.conSys.enableTLS) - scheme += "s"; - apiEndpointUrl = scheme + "://" + config.conSys.remoteHost + ":" + config.conSys.remotePort; - if (config.conSys.resourcePath != null) - { - if (config.conSys.resourcePath.charAt(0) != '/') - apiEndpointUrl += '/'; - apiEndpointUrl += config.conSys.resourcePath; - } + for(var obs : e.getObservations()) + client.pushObs(streamInfo.dataStreamID, obs, this.dataBaseView.getObservationStore()); } + } From 13f98d09bfd61b789bf13459e030dde0aab89d3e Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Tue, 24 Sep 2024 16:40:50 -0500 Subject: [PATCH 03/15] Added observation subscribers and subsystems support --- .../consys/client/ConSysApiClient.java | 22 +++ .../consys/client/ConSysApiClientModule.java | 173 ++++++++++++++---- 2 files changed, 156 insertions(+), 39 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index 00619c364..0e872b574 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -70,6 +70,7 @@ public class ConSysApiClient static final String DATASTREAMS_COLLECTION = "datastreams"; static final String CONTROLS_COLLECTION = "controls"; static final String OBSERVATIONS_COLLECTION = "observations"; + static final String SUBSYSTEMS_COLLECTION = "subsystems"; static final String SF_COLLECTION = "fois"; HttpClient http; @@ -152,6 +153,27 @@ public CompletableFuture addSystem(ISystemWithDesc system) throw new IllegalStateException("Error initializing binding", e); } } + + public CompletableFuture addSubSystem(String systemID, ISystemWithDesc system) + { + try + { + var buffer = new InMemoryBufferStreamHandler(); + var ctx = new RequestContext(buffer); + + var binding = new SystemBindingSmlJson(ctx, null, false); + binding.serialize(null, system, false); + + return sendPostRequest( + endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemID + "/" + SUBSYSTEMS_COLLECTION), + ResourceFormat.SML_JSON, + buffer); + } + catch (IOException e) + { + throw new IllegalStateException("Error initializing binding", e); + } + } public CompletableFuture> addSystems(ISystemWithDesc... systems) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java index 8f2911103..2e6d4c6c2 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -1,5 +1,7 @@ package org.sensorhub.impl.service.consys.client; +import org.checkerframework.checker.units.qual.C; +import org.sensorhub.api.client.ClientException; import org.sensorhub.api.client.IClientModule; import org.sensorhub.api.common.BigId; import org.sensorhub.api.common.SensorHubException; @@ -10,6 +12,7 @@ import org.sensorhub.api.datastore.obs.DataStreamFilter; import org.sensorhub.api.datastore.obs.ObsFilter; import org.sensorhub.api.datastore.system.SystemFilter; +import org.sensorhub.api.event.EventUtils; import org.sensorhub.api.system.ISystemWithDesc; import org.sensorhub.impl.module.AbstractModule; import org.sensorhub.impl.module.RobustConnection; @@ -35,16 +38,19 @@ public class ConSysApiClientModule extends AbstractModule Map registeredSystems; NavigableMap dataStreams; - class SystemRegInfo + static class SystemRegInfo { + private String systemID; + private BigId internalID; private Flow.Subscription subscription; private ISystemWithDesc system; } - class StreamInfo + static class StreamInfo { private String dataStreamID; private String topicID; + private BigId internalID; private String sysUID; private String outputName; private Flow.Subscription subscription; @@ -107,40 +113,19 @@ protected void doStart() throws SensorHubException { reportError("Unable to establish connection to Connected Systems endpoint", e); } - dataBaseView.getSystemDescStore().select(dataBaseView.getSystemDescStore().selectAllFilter()) - .forEach((system) -> { - String newSys = null; - try { - newSys = client.addSystem(system).get(); - String finalNewSys = newSys; - - dataBaseView.getDataStreamStore().select(new DataStreamFilter.Builder().withSystems(new SystemFilter.Builder().withUniqueIDs(system.getUniqueIdentifier()).build()).build()) - .forEach((datastream) -> { - try { - String newDs = client.addDataStream(finalNewSys, datastream).get(); - System.out.println("Added datastream " + datastream.getOutputName() + ". Now pushing observations..."); - if(dataBaseView.getObservationStore() == null) { - System.out.println("Observation store does not exist, continuing"); - return; - } - dataBaseView.getObservationStore().select( - new ObsFilter.Builder().withDataStreams( - new DataStreamFilter.Builder().withOutputNames(datastream.getOutputName()) - .build()) - .build()) - .forEach((obs) -> { - client.pushObs(newDs, obs, dataBaseView.getObservationStore()); - }); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - System.out.println(newSys); + dataBaseView.getSystemDescStore().selectEntries( + new SystemFilter.Builder() + .withNoParent() + .build()) + .forEach((entry) -> { + var systemRegInfo = registerSystem(entry.getKey().getInternalID(), entry.getValue()); + checkSubSystems(systemRegInfo); + registerSystemDataStreams(systemRegInfo); }); + for (var stream : dataStreams.values()) + startStream(stream); + // TODO: Check if system exists // TODO: Ensure connection can be made @@ -151,22 +136,132 @@ protected void doStart() throws SensorHubException { // TODO: Push observations from datastreams } + @Override + protected void doStop() throws SensorHubException { + super.doStop(); + + for(var stream : dataStreams.values()) + stopStream(stream); + } + + protected void checkSubSystems(SystemRegInfo parentSystemRegInfo) + { + dataBaseView.getSystemDescStore().selectEntries( + new SystemFilter.Builder() + .withParents(parentSystemRegInfo.internalID) + .build()) + .forEach((entry) -> { + var systemRegInfo = registerSubSystem(entry.getKey().getInternalID(), parentSystemRegInfo, entry.getValue()); + registerSystemDataStreams(systemRegInfo); + }); + } + + protected SystemRegInfo registerSystem(BigId systemInternalID, ISystemWithDesc system) + { + try { + String systemID = client.addSystem(system).get(); + + SystemRegInfo systemRegInfo = new SystemRegInfo(); + systemRegInfo.systemID = systemID; + systemRegInfo.internalID = systemInternalID; + systemRegInfo.system = system; + return systemRegInfo; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + protected SystemRegInfo registerSubSystem(BigId systemInternalID, SystemRegInfo parentSystem, ISystemWithDesc system) + { + try { + var getParent = client.getSystemById(parentSystem.systemID, ResourceFormat.JSON); + if(getParent == null) + throw new ClientException("Could not retrieve parent system " + parentSystem.systemID); + String systemID = client.addSubSystem(parentSystem.systemID, system).get(); + + SystemRegInfo systemRegInfo = new SystemRegInfo(); + systemRegInfo.systemID = systemID; + systemRegInfo.internalID = systemInternalID; + systemRegInfo.system = system; + return systemRegInfo; + } catch (InterruptedException | ExecutionException | ClientException e) { + throw new RuntimeException(e); + } + } + protected void registerSystemDataStreams(SystemRegInfo system) { - dataBaseView.getDataStreamStore().select( + dataBaseView.getDataStreamStore().selectEntries( new DataStreamFilter.Builder() .withSystems(new SystemFilter.Builder() .withUniqueIDs(system.system.getUniqueIdentifier()) .build()) .build()) - .forEach((dataStream) -> { -// var streamInfo = registerDataStream(dataStream); -// dataStreams.put("", streamInfo); + .forEach((entry) -> { + var streamInfo = registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue()); + dataStreams.put(streamInfo.dataStreamID, streamInfo); }); } - protected void registerDataStream(IDataStreamInfo dataStream) + protected StreamInfo registerDataStream(BigId dsId, String systemID, IDataStreamInfo dataStream) { + var dsTopicId = EventUtils.getDataStreamDataTopicID(dataStream); + + StreamInfo streamInfo = new StreamInfo(); + try { + streamInfo.dataStreamID = client.addDataStream(systemID, dataStream).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + + streamInfo.topicID = dsTopicId; + streamInfo.outputName = dataStream.getOutputName(); + streamInfo.sysUID = dataStream.getSystemID().getUniqueID(); + streamInfo.internalID = dsId; + + return streamInfo; + } + + protected void startStream(StreamInfo streamInfo) throws ClientException + { + try + { + if(streamInfo.subscription != null) + return; + + getParentHub().getEventBus().newSubscription(ObsEvent.class) + .withTopicID(streamInfo.topicID) + .withEventType(ObsEvent.class) + .subscribe(e -> handleEvent(e, streamInfo)) + .thenAccept(subscription -> { + streamInfo.subscription = subscription; + subscription.request(Long.MAX_VALUE); + + // Push latest observation + this.dataBaseView.getObservationStore().select(new ObsFilter.Builder() + .withDataStreams(streamInfo.internalID) + .withLatestResult() + .build()) + .forEach(obs -> + client.pushObs(streamInfo.dataStreamID, obs, this.dataBaseView.getObservationStore())); + + getLogger().info("Starting Connected Systems data push for stream {} with UID {} to Connected Systems endpoint {}", + streamInfo.dataStreamID, streamInfo.sysUID, apiEndpointUrl); + }); + } catch (Exception e) + { + throw new ClientException("Error starting data push for stream " + streamInfo.topicID, e); + } + } + + protected void stopStream(StreamInfo streamInfo) throws ClientException + { + if(streamInfo.subscription != null) + { + streamInfo.subscription.cancel(); + streamInfo.subscription = null; + } + } From 7399c970f582ad2a3c1de236b941f0481ce61d30 Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Wed, 25 Sep 2024 18:59:21 -0500 Subject: [PATCH 04/15] POST subsystems to "members" endpoint --- .../sensorhub/impl/service/consys/client/ConSysApiClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index 0e872b574..1989383c6 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -70,7 +70,7 @@ public class ConSysApiClient static final String DATASTREAMS_COLLECTION = "datastreams"; static final String CONTROLS_COLLECTION = "controls"; static final String OBSERVATIONS_COLLECTION = "observations"; - static final String SUBSYSTEMS_COLLECTION = "subsystems"; + static final String SUBSYSTEMS_COLLECTION = "members"; static final String SF_COLLECTION = "fois"; HttpClient http; From 427a5bc5cb17db5b61df444bfb8eecfc3c8d3fcb Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Thu, 26 Sep 2024 17:45:08 -0500 Subject: [PATCH 05/15] added connection check and subsystems check --- .../consys/client/ConSysApiClient.java | 55 ++++++++++--------- .../consys/client/ConSysApiClientModule.java | 46 +++++++++------- 2 files changed, 55 insertions(+), 46 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index 1989383c6..37df5928b 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -14,16 +14,8 @@ package org.sensorhub.impl.service.consys.client; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.StringReader; -import java.net.Authenticator; -import java.net.PasswordAuthentication; -import java.net.URI; -import java.net.URISyntaxException; +import java.io.*; +import java.net.*; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse.BodyHandler; @@ -31,14 +23,15 @@ import java.net.http.HttpResponse.BodySubscriber; import java.net.http.HttpResponse.BodySubscribers; import java.net.http.HttpResponse.ResponseInfo; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.Set; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.function.Function; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import org.sensorhub.api.command.ICommandData; import org.sensorhub.api.command.ICommandStreamInfo; import org.sensorhub.api.data.IDataStreamInfo; @@ -70,7 +63,7 @@ public class ConSysApiClient static final String DATASTREAMS_COLLECTION = "datastreams"; static final String CONTROLS_COLLECTION = "controls"; static final String OBSERVATIONS_COLLECTION = "observations"; - static final String SUBSYSTEMS_COLLECTION = "members"; + static final String SUBSYSTEMS_COLLECTION = "subsystems"; static final String SF_COLLECTION = "fois"; HttpClient http; @@ -114,22 +107,30 @@ public CompletableFuture getSystemById(String id, ResourceForma }); } - - public CompletableFuture getSystemByUid(String uid, ResourceFormat format) - { - return sendGetRequest(endpoint.resolve(SYSTEMS_COLLECTION + "?uid=" + uid), format, body -> { - try - { + // TODO Needs to parse top feature from FeatureCollection, instead of trying to parse FeatureCollection as ISystemWithDesc + public CompletableFuture getSystemByUid(String uid, ResourceFormat format) throws ExecutionException, InterruptedException { + var searchUID = sendGetRequest(endpoint.resolve(SYSTEMS_COLLECTION + "?uid=" + uid), format, body -> { + try { var ctx = new RequestContext(body); - var binding = new SystemBindingGeoJson(ctx, null, null, true); - return binding.deserialize(); - } - catch (IOException e) - { + + JsonObject bodyJson = JsonParser.parseReader(new InputStreamReader(ctx.getInputStream())).getAsJsonObject(); + JsonArray features = bodyJson.getAsJsonArray("items"); + + if(features != null && !features.isEmpty()) { + JsonObject firstFeature = features.get(0).getAsJsonObject(); + String featureID = firstFeature.get("id").getAsString(); + + return featureID; + } else { + throw new ResourceParseException("No Features Found"); + } + } catch (IOException e) { e.printStackTrace(); throw new CompletionException(e); } }); + var id = searchUID.get(); + return getSystemById(id, format); } diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java index 2e6d4c6c2..4e1e52fa9 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -108,11 +108,15 @@ protected void doInit() throws SensorHubException protected void doStart() throws SensorHubException { // Check if endpoint is available try{ - client.sendGetRequest(URI.create(apiEndpointUrl), ResourceFormat.JSON, null); + HttpURLConnection urlConnection = (HttpURLConnection) client.endpoint.toURL().openConnection(); + urlConnection.connect(); + assert urlConnection.getResponseCode() == HttpURLConnection.HTTP_OK; } catch (Exception e) { - reportError("Unable to establish connection to Connected Systems endpoint", e); + throw new SensorHubException("Unable to establish connection to Connected Systems endpoint"); } + reportStatus("Connection to " + apiEndpointUrl + " was made successfully"); + dataBaseView.getSystemDescStore().selectEntries( new SystemFilter.Builder() .withNoParent() @@ -126,14 +130,9 @@ protected void doStart() throws SensorHubException { for (var stream : dataStreams.values()) startStream(stream); - // TODO: Check if system exists - // TODO: Ensure connection can be made - - // TODO: Subscribe to system registry + // TODO: Subscribe to system registry for system events - // TODO: Register systems/subsystems to destination SensorHub - // TODO: Register datastreams to destination - // TODO: Push observations from datastreams + // TODO: Include option to push using persistent HTTP connection } @Override @@ -147,13 +146,13 @@ protected void doStop() throws SensorHubException { protected void checkSubSystems(SystemRegInfo parentSystemRegInfo) { dataBaseView.getSystemDescStore().selectEntries( - new SystemFilter.Builder() - .withParents(parentSystemRegInfo.internalID) - .build()) - .forEach((entry) -> { - var systemRegInfo = registerSubSystem(entry.getKey().getInternalID(), parentSystemRegInfo, entry.getValue()); - registerSystemDataStreams(systemRegInfo); - }); + new SystemFilter.Builder() + .withParents(parentSystemRegInfo.internalID) + .build()) + .forEach((entry) -> { + var systemRegInfo = registerSubSystem(entry.getKey().getInternalID(), parentSystemRegInfo, entry.getValue()); + registerSystemDataStreams(systemRegInfo); + }); } protected SystemRegInfo registerSystem(BigId systemInternalID, ISystemWithDesc system) @@ -175,9 +174,18 @@ protected SystemRegInfo registerSubSystem(BigId systemInternalID, SystemRegInfo { try { var getParent = client.getSystemById(parentSystem.systemID, ResourceFormat.JSON); - if(getParent == null) + var parent = getParent.get(); + if(parent == null) throw new ClientException("Could not retrieve parent system " + parentSystem.systemID); - String systemID = client.addSubSystem(parentSystem.systemID, system).get(); + + var uidRequest = client.getSystemByUid(system.getUniqueIdentifier(), ResourceFormat.JSON); + var oldSys = uidRequest.get(); + + String systemID ; + if(oldSys.getId() != null) + systemID = oldSys.getId(); + else + systemID = client.addSubSystem(parentSystem.systemID, system).get(); SystemRegInfo systemRegInfo = new SystemRegInfo(); systemRegInfo.systemID = systemID; @@ -222,7 +230,7 @@ protected StreamInfo registerDataStream(BigId dsId, String systemID, IDataStream return streamInfo; } - protected void startStream(StreamInfo streamInfo) throws ClientException + protected synchronized void startStream(StreamInfo streamInfo) throws ClientException { try { From 393f9978a0037b6b19a2a39b349ac00a878eb71a Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Fri, 27 Sep 2024 17:16:44 -0500 Subject: [PATCH 06/15] Better checks --- .../impl/service/consys/client/ConSysApiClient.java | 4 +++- .../impl/service/consys/client/ConSysApiClientModule.java | 7 +++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index 37df5928b..7c49091f3 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -122,7 +122,7 @@ public CompletableFuture getSystemByUid(String uid, ResourceFor return featureID; } else { - throw new ResourceParseException("No Features Found"); + return ""; } } catch (IOException e) { e.printStackTrace(); @@ -130,6 +130,8 @@ public CompletableFuture getSystemByUid(String uid, ResourceFor } }); var id = searchUID.get(); + if (Objects.equals(id, "")) + return null; return getSystemById(id, format); } diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java index 4e1e52fa9..fe3d54ccf 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -179,12 +179,11 @@ protected SystemRegInfo registerSubSystem(BigId systemInternalID, SystemRegInfo throw new ClientException("Could not retrieve parent system " + parentSystem.systemID); var uidRequest = client.getSystemByUid(system.getUniqueIdentifier(), ResourceFormat.JSON); - var oldSys = uidRequest.get(); - String systemID ; - if(oldSys.getId() != null) + if(uidRequest != null) { + var oldSys = uidRequest.get(); systemID = oldSys.getId(); - else + } else systemID = client.addSubSystem(parentSystem.systemID, system).get(); SystemRegInfo systemRegInfo = new SystemRegInfo(); From 23377d70c5b4ed17c6038429d83feb507f985cad Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Fri, 27 Sep 2024 18:38:38 -0500 Subject: [PATCH 07/15] Add system put request --- .../consys/client/ConSysApiClient.java | 45 ++++++++++++++++++- .../consys/client/ConSysApiClientModule.java | 10 ++++- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index 7c49091f3..d1235af39 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -157,6 +157,27 @@ public CompletableFuture addSystem(ISystemWithDesc system) } } + public CompletableFuture updateSystem(String systemID, ISystemWithDesc system) + { + try + { + var buffer = new InMemoryBufferStreamHandler(); + var ctx = new RequestContext(buffer); + + var binding = new SystemBindingSmlJson(ctx, null, false); + binding.serialize(null, system, false); + + return sendPutRequest( + endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemID), + ResourceFormat.SML_JSON, + buffer); + } + catch (IOException e) + { + throw new IllegalStateException("Error initializing binding", e); + } + } + public CompletableFuture addSubSystem(String systemID, ISystemWithDesc system) { try @@ -178,7 +199,6 @@ public CompletableFuture addSubSystem(String systemID, ISystemWithDesc s } } - public CompletableFuture> addSystems(ISystemWithDesc... systems) { return addSystems(Arrays.asList(systems)); @@ -450,6 +470,29 @@ protected CompletableFuture sendPostRequest(URI collectionUri, ResourceF throw new CompletionException(resp.body(), null); }); } + + protected CompletableFuture sendPutRequest(URI collectionUri, ResourceFormat format, InMemoryBufferStreamHandler body) + { + var req = HttpRequest.newBuilder() + .uri(collectionUri) + .PUT(HttpRequest.BodyPublishers.ofInputStream(() -> body.getAsInputStream())) + .header(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()) + .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()) + .build(); + + return http.sendAsync(req, BodyHandlers.ofString()) + .thenApply(resp -> { + if (resp.statusCode() == 201 || resp.statusCode() == 303) + { + var location = resp.headers() + .firstValue(HttpHeaders.LOCATION) + .orElseThrow(() -> new IllegalStateException("Missing Location header in response")); + return location.substring(location.lastIndexOf('/')+1); + } + else + throw new CompletionException(resp.body(), null); + }); + } protected CompletableFuture> sendBatchPostRequest(URI collectionUri, ResourceFormat format, InMemoryBufferStreamHandler body) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java index fe3d54ccf..7e517ef8e 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -158,7 +158,13 @@ protected void checkSubSystems(SystemRegInfo parentSystemRegInfo) protected SystemRegInfo registerSystem(BigId systemInternalID, ISystemWithDesc system) { try { - String systemID = client.addSystem(system).get(); + var uidRequest = client.getSystemByUid(system.getUniqueIdentifier(), ResourceFormat.JSON); + String systemID; + if(uidRequest != null) { + var oldSys = uidRequest.get(); + systemID = oldSys.getId(); + } else + systemID = client.addSystem(system).get(); SystemRegInfo systemRegInfo = new SystemRegInfo(); systemRegInfo.systemID = systemID; @@ -179,7 +185,7 @@ protected SystemRegInfo registerSubSystem(BigId systemInternalID, SystemRegInfo throw new ClientException("Could not retrieve parent system " + parentSystem.systemID); var uidRequest = client.getSystemByUid(system.getUniqueIdentifier(), ResourceFormat.JSON); - String systemID ; + String systemID; if(uidRequest != null) { var oldSys = uidRequest.get(); systemID = oldSys.getId(); From 6043d8fafa05b02805d16692b3f951cc5d99f83b Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Sat, 28 Sep 2024 14:29:30 -0500 Subject: [PATCH 08/15] Edit PUT method --- .../consys/client/ConSysApiClient.java | 19 ++++---------- .../consys/client/ConSysApiClientModule.java | 26 +++++++++++++++---- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index d1235af39..63355df6b 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -18,6 +18,7 @@ import java.net.*; import java.net.http.HttpClient; import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodyHandlers; import java.net.http.HttpResponse.BodySubscriber; @@ -157,7 +158,7 @@ public CompletableFuture addSystem(ISystemWithDesc system) } } - public CompletableFuture updateSystem(String systemID, ISystemWithDesc system) + public CompletableFuture updateSystem(String systemID, ISystemWithDesc system) { try { @@ -198,7 +199,7 @@ public CompletableFuture addSubSystem(String systemID, ISystemWithDesc s throw new IllegalStateException("Error initializing binding", e); } } - + public CompletableFuture> addSystems(ISystemWithDesc... systems) { return addSystems(Arrays.asList(systems)); @@ -471,7 +472,7 @@ protected CompletableFuture sendPostRequest(URI collectionUri, ResourceF }); } - protected CompletableFuture sendPutRequest(URI collectionUri, ResourceFormat format, InMemoryBufferStreamHandler body) + protected CompletableFuture sendPutRequest(URI collectionUri, ResourceFormat format, InMemoryBufferStreamHandler body) { var req = HttpRequest.newBuilder() .uri(collectionUri) @@ -481,17 +482,7 @@ protected CompletableFuture sendPutRequest(URI collectionUri, ResourceFo .build(); return http.sendAsync(req, BodyHandlers.ofString()) - .thenApply(resp -> { - if (resp.statusCode() == 201 || resp.statusCode() == 303) - { - var location = resp.headers() - .firstValue(HttpHeaders.LOCATION) - .orElseThrow(() -> new IllegalStateException("Missing Location header in response")); - return location.substring(location.lastIndexOf('/')+1); - } - else - throw new CompletionException(resp.body(), null); - }); + .thenApply(HttpResponse::statusCode); } diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java index 7e517ef8e..3b3d7e2ce 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -1,6 +1,8 @@ package org.sensorhub.impl.service.consys.client; +import com.google.common.base.Strings; import org.checkerframework.checker.units.qual.C; +import org.eclipse.jetty.client.HttpResponse; import org.sensorhub.api.client.ClientException; import org.sensorhub.api.client.IClientModule; import org.sensorhub.api.common.BigId; @@ -19,9 +21,9 @@ import org.sensorhub.impl.service.consys.resource.ResourceFormat; import org.vast.cdm.common.DataStreamWriter; import org.vast.swe.SWEData; +import org.vast.util.Asserts; -import java.net.HttpURLConnection; -import java.net.URI; +import java.net.*; import java.util.Map; import java.util.NavigableMap; import java.util.concurrent.ConcurrentHashMap; @@ -98,7 +100,7 @@ protected void doInit() throws SensorHubException this.client = ConSysApiClient. newBuilder(apiEndpointUrl) - .simpleAuth(config.conSys.user, config.conSys.password.toCharArray()) + .simpleAuth(config.conSys.user, !config.conSys.password.isEmpty() ? config.conSys.password.toCharArray() : null) .build(); // TODO: Other initialization @@ -109,8 +111,16 @@ protected void doStart() throws SensorHubException { // Check if endpoint is available try{ HttpURLConnection urlConnection = (HttpURLConnection) client.endpoint.toURL().openConnection(); + if (!Strings.isNullOrEmpty(config.conSys.user)) { + urlConnection.setAuthenticator(new Authenticator() { + @Override + public PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(config.conSys.user, config.conSys.password != null ? config.conSys.password.toCharArray() : new char[0]); + } + }); + } urlConnection.connect(); - assert urlConnection.getResponseCode() == HttpURLConnection.HTTP_OK; + Asserts.checkArgument(urlConnection.getResponseCode() == HttpURLConnection.HTTP_OK); } catch (Exception e) { throw new SensorHubException("Unable to establish connection to Connected Systems endpoint"); } @@ -163,6 +173,9 @@ protected SystemRegInfo registerSystem(BigId systemInternalID, ISystemWithDesc s if(uidRequest != null) { var oldSys = uidRequest.get(); systemID = oldSys.getId(); + var responseCode = client.updateSystem(systemID, system).get(); + if(responseCode != 204) + throw new ClientException("There was a problem updating resource: " + apiEndpointUrl + ConSysApiClient.SYSTEMS_COLLECTION + "/" + systemID); } else systemID = client.addSystem(system).get(); @@ -171,7 +184,7 @@ protected SystemRegInfo registerSystem(BigId systemInternalID, ISystemWithDesc s systemRegInfo.internalID = systemInternalID; systemRegInfo.system = system; return systemRegInfo; - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException | ClientException e) { throw new RuntimeException(e); } } @@ -189,6 +202,9 @@ protected SystemRegInfo registerSubSystem(BigId systemInternalID, SystemRegInfo if(uidRequest != null) { var oldSys = uidRequest.get(); systemID = oldSys.getId(); + var responseCode = client.updateSystem(systemID, system).get(); + if(responseCode != 204) + throw new ClientException("There was a problem updating resource: " + apiEndpointUrl + ConSysApiClient.SYSTEMS_COLLECTION + "/" + systemID); } else systemID = client.addSubSystem(parentSystem.systemID, system).get(); From 914f763ea43dbfb088826c6cb684dadc2cb8cd84 Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Sat, 28 Sep 2024 19:13:04 -0500 Subject: [PATCH 09/15] Update DeploymentBindingHtml.java --- .../consys/deployment/DeploymentBindingHtml.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/deployment/DeploymentBindingHtml.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/deployment/DeploymentBindingHtml.java index 9ae87b574..3d7066ae0 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/deployment/DeploymentBindingHtml.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/deployment/DeploymentBindingHtml.java @@ -71,19 +71,18 @@ protected String getResourceUrl(FeatureKey key) var procId = idEncoders.getDeploymentIdEncoder().encodeID(key.getInternalID()); return ctx.getApiRootURL() + "/" + DeploymentHandler.NAMES[0] + "/" + procId; } - - + + @Override protected DivTag getLinks(String resourceUrl, FeatureKey key, IDeploymentWithDesc f) { var deplId = idEncoders.getDeploymentIdEncoder().encodeID(key.getInternalID()); - DivTag div = div( - (DomContent) iff(assocs.getParentLink(deplId, ResourceFormat.HTML), + return div((DomContent) + iff(assocs.getParentLink(deplId, ResourceFormat.HTML), link -> getLinkButton("Parent Deployment", link.getHref())), - (DomContent) iff(assocs.getSubdeploymentsLink(deplId, ResourceFormat.HTML), + iff(assocs.getSubdeploymentsLink(deplId, ResourceFormat.HTML), link -> getLinkButton("Subdeployments", link.getHref())) ); - return div; } -} +} \ No newline at end of file From 06d99d7f841e25213d01a38f8d270b3cc6a8bfc3 Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Sun, 29 Sep 2024 16:29:37 -0500 Subject: [PATCH 10/15] Add binary format for pushing observations --- .../consys/client/ConSysApiClient.java | 26 ++++++++++++++----- .../consys/client/ConSysApiClientModule.java | 16 ++++++++---- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index 63355df6b..e677555ad 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -33,8 +33,10 @@ import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import net.opengis.swe.v20.BinaryEncoding; import org.sensorhub.api.command.ICommandData; import org.sensorhub.api.command.ICommandStreamInfo; +import org.sensorhub.api.common.BigId; import org.sensorhub.api.data.IDataStreamInfo; import org.sensorhub.api.data.IObsData; import org.sensorhub.api.datastore.obs.IObsStore; @@ -42,7 +44,10 @@ import org.sensorhub.impl.service.consys.ResourceParseException; import org.sensorhub.impl.service.consys.obs.DataStreamBindingJson; import org.sensorhub.impl.service.consys.obs.ObsBindingOmJson; +import org.sensorhub.impl.service.consys.obs.ObsBindingSweCommon; +import org.sensorhub.impl.service.consys.obs.ObsHandler; import org.sensorhub.impl.service.consys.resource.RequestContext; +import org.sensorhub.impl.service.consys.resource.ResourceBinding; import org.sensorhub.impl.service.consys.resource.ResourceFormat; import org.sensorhub.impl.service.consys.resource.ResourceLink; import org.sensorhub.impl.service.consys.stream.StreamHandler; @@ -384,20 +389,27 @@ protected void endJsonCollection(JsonWriter writer, Collection lin /*--------------*/ /* Observations */ /*--------------*/ - - public CompletableFuture pushObs(String datastreamId, IObsData obs, IObsStore obsStore) + // TODO: Be able to push different kinds of observations such as video + public CompletableFuture pushObs(String dataStreamId, IDataStreamInfo dataStream, IObsData obs, IObsStore obsStore) { try { var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - var binding = new ObsBindingOmJson(ctx, null, false, obsStore); - binding.serializeCreate(obs); - + if(dataStream != null && dataStream.getRecordEncoding() instanceof BinaryEncoding) { + var binding = new ObsBindingSweCommon(ctx, null, false, obsStore); + binding.serialize(null, obs, false); + ctx.setFormat(ResourceFormat.SWE_BINARY); + } else { + var binding = new ObsBindingOmJson(ctx, null, false, obsStore); + binding.serializeCreate(obs); + ctx.setFormat(ResourceFormat.OM_JSON); + } + return sendPostRequest( - endpoint.resolve(DATASTREAMS_COLLECTION + "/" + datastreamId + "/" + OBSERVATIONS_COLLECTION), - ResourceFormat.JSON, + endpoint.resolve(DATASTREAMS_COLLECTION + "/" + dataStreamId + "/" + OBSERVATIONS_COLLECTION), + ctx.getFormat(), buffer); } catch (IOException e) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java index 3b3d7e2ce..d644e5927 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -26,6 +26,7 @@ import java.net.*; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; @@ -50,6 +51,7 @@ static class SystemRegInfo static class StreamInfo { + private IDataStreamInfo dataStream; private String dataStreamID; private String topicID; private BigId internalID; @@ -224,11 +226,14 @@ protected void registerSystemDataStreams(SystemRegInfo system) new DataStreamFilter.Builder() .withSystems(new SystemFilter.Builder() .withUniqueIDs(system.system.getUniqueIdentifier()) - .build()) + .build()) .build()) .forEach((entry) -> { - var streamInfo = registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue()); - dataStreams.put(streamInfo.dataStreamID, streamInfo); + if(Objects.equals(entry.getValue().getSystemID().getUniqueID(), system.system.getUniqueIdentifier())) + { + var streamInfo = registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue()); + dataStreams.put(streamInfo.dataStreamID, streamInfo); + } }); } @@ -243,6 +248,7 @@ protected StreamInfo registerDataStream(BigId dsId, String systemID, IDataStream throw new RuntimeException(e); } + streamInfo.dataStream = dataStream; streamInfo.topicID = dsTopicId; streamInfo.outputName = dataStream.getOutputName(); streamInfo.sysUID = dataStream.getSystemID().getUniqueID(); @@ -272,7 +278,7 @@ protected synchronized void startStream(StreamInfo streamInfo) throws ClientExce .withLatestResult() .build()) .forEach(obs -> - client.pushObs(streamInfo.dataStreamID, obs, this.dataBaseView.getObservationStore())); + client.pushObs(streamInfo.dataStreamID, streamInfo.dataStream, obs, this.dataBaseView.getObservationStore())); getLogger().info("Starting Connected Systems data push for stream {} with UID {} to Connected Systems endpoint {}", streamInfo.dataStreamID, streamInfo.sysUID, apiEndpointUrl); @@ -303,7 +309,7 @@ public boolean isConnected() protected void handleEvent(final ObsEvent e, StreamInfo streamInfo) { for(var obs : e.getObservations()) - client.pushObs(streamInfo.dataStreamID, obs, this.dataBaseView.getObservationStore()); + client.pushObs(streamInfo.dataStreamID, streamInfo.dataStream, obs, this.dataBaseView.getObservationStore()); } } From b7eeb3a93dbfa73c9059bab9f937c231284776c3 Mon Sep 17 00:00:00 2001 From: earocorn Date: Wed, 2 Oct 2024 14:50:50 -0500 Subject: [PATCH 11/15] Push video correctly --- .../consys/client/ConSysApiClient.java | 174 +++++++++--------- .../consys/resource/RequestContext.java | 5 +- 2 files changed, 91 insertions(+), 88 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index e677555ad..cbbc10dbf 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -7,9 +7,9 @@ Software distributed under the License is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. - + Copyright (C) 2023 Sensia Software LLC. All Rights Reserved. - + ******************************* END LICENSE BLOCK ***************************/ package org.sensorhub.impl.service.consys.client; @@ -36,18 +36,17 @@ import net.opengis.swe.v20.BinaryEncoding; import org.sensorhub.api.command.ICommandData; import org.sensorhub.api.command.ICommandStreamInfo; -import org.sensorhub.api.common.BigId; import org.sensorhub.api.data.IDataStreamInfo; import org.sensorhub.api.data.IObsData; import org.sensorhub.api.datastore.obs.IObsStore; import org.sensorhub.api.system.ISystemWithDesc; import org.sensorhub.impl.service.consys.ResourceParseException; +import org.sensorhub.impl.service.consys.SWECommonUtils; import org.sensorhub.impl.service.consys.obs.DataStreamBindingJson; import org.sensorhub.impl.service.consys.obs.ObsBindingOmJson; import org.sensorhub.impl.service.consys.obs.ObsBindingSweCommon; import org.sensorhub.impl.service.consys.obs.ObsHandler; import org.sensorhub.impl.service.consys.resource.RequestContext; -import org.sensorhub.impl.service.consys.resource.ResourceBinding; import org.sensorhub.impl.service.consys.resource.ResourceFormat; import org.sensorhub.impl.service.consys.resource.ResourceLink; import org.sensorhub.impl.service.consys.stream.StreamHandler; @@ -71,31 +70,31 @@ public class ConSysApiClient static final String OBSERVATIONS_COLLECTION = "observations"; static final String SUBSYSTEMS_COLLECTION = "subsystems"; static final String SF_COLLECTION = "fois"; - + HttpClient http; URI endpoint; - - + + static class InMemoryBufferStreamHandler implements StreamHandler { ByteArrayOutputStream os = new ByteArrayOutputStream(); - + public void setStartCallback(Runnable onStart) {} public void setCloseCallback(Runnable onClose) {} public void sendPacket() throws IOException {} public void close() {} public OutputStream getOutputStream() { return os; } - public InputStream getAsInputStream() { return new ByteArrayInputStream(os.toByteArray()); } + public InputStream getAsInputStream() { return new BufferedInputStream(new ByteArrayInputStream(os.toByteArray()), 8192); } } - - + + protected ConSysApiClient() {} - - + + /*---------*/ /* Systems */ /*---------*/ - + public CompletableFuture getSystemById(String id, ResourceFormat format) { return sendGetRequest(endpoint.resolve(SYSTEMS_COLLECTION + "/" + id), format, body -> { @@ -112,7 +111,7 @@ public CompletableFuture getSystemById(String id, ResourceForma } }); } - + // TODO Needs to parse top feature from FeatureCollection, instead of trying to parse FeatureCollection as ISystemWithDesc public CompletableFuture getSystemByUid(String uid, ResourceFormat format) throws ExecutionException, InterruptedException { var searchUID = sendGetRequest(endpoint.resolve(SYSTEMS_COLLECTION + "?uid=" + uid), format, body -> { @@ -140,18 +139,18 @@ public CompletableFuture getSystemByUid(String uid, ResourceFor return null; return getSystemById(id, format); } - - + + public CompletableFuture addSystem(ISystemWithDesc system) { try { var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - + var binding = new SystemBindingSmlJson(ctx, null, false); binding.serialize(null, system, false); - + return sendPostRequest( endpoint.resolve(SYSTEMS_COLLECTION), ResourceFormat.SML_JSON, @@ -209,33 +208,33 @@ public CompletableFuture> addSystems(ISystemWithDesc... systems) { return addSystems(Arrays.asList(systems)); } - - + + public CompletableFuture> addSystems(Collection systems) { try { var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - + var binding = new SystemBindingSmlJson(ctx, null, false) { protected void startJsonCollection(JsonWriter writer) throws IOException { writer.beginArray(); } - + protected void endJsonCollection(JsonWriter writer, Collection links) throws IOException { writer.endArray(); writer.flush(); } }; - + binding.startCollection(); for (var sys: systems) binding.serialize(null, sys, false); binding.endCollection(Collections.emptyList()); - + return sendBatchPostRequest( endpoint.resolve(SYSTEMS_COLLECTION), ResourceFormat.SML_JSON, @@ -246,22 +245,22 @@ protected void endJsonCollection(JsonWriter writer, Collection lin throw new IllegalStateException("Error initializing binding", e); } } - - + + /*-------------*/ /* Datastreams */ /*-------------*/ - + public CompletableFuture addDataStream(String systemId, IDataStreamInfo datastream) { try { var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - + var binding = new DataStreamBindingJson(ctx, null, null, false, Collections.emptyMap()); binding.serializeCreate(datastream); - + return sendPostRequest( endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + DATASTREAMS_COLLECTION), ResourceFormat.JSON, @@ -272,39 +271,39 @@ public CompletableFuture addDataStream(String systemId, IDataStreamInfo throw new IllegalStateException("Error initializing binding", e); } } - - + + public CompletableFuture> addDataStreams(String systemId, IDataStreamInfo... datastreams) { return addDataStreams(systemId, Arrays.asList(datastreams)); } - - + + public CompletableFuture> addDataStreams(String systemId, Collection datastreams) { try { var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - + var binding = new DataStreamBindingJson(ctx, null, null, false, Collections.emptyMap()) { protected void startJsonCollection(JsonWriter writer) throws IOException { writer.beginArray(); } - + protected void endJsonCollection(JsonWriter writer, Collection links) throws IOException { writer.endArray(); writer.flush(); } }; - + binding.startCollection(); for (var ds: datastreams) binding.serializeCreate(ds); binding.endCollection(Collections.emptyList()); - + return sendBatchPostRequest( endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + DATASTREAMS_COLLECTION), ResourceFormat.JSON, @@ -315,22 +314,22 @@ protected void endJsonCollection(JsonWriter writer, Collection lin throw new IllegalStateException("Error initializing binding", e); } } - - + + /*-----------------*/ /* Control Streams */ /*-----------------*/ - + public CompletableFuture addControlStream(String systemId, ICommandStreamInfo cmdstream) { try { var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - + var binding = new CommandStreamBindingJson(ctx, null, null, false); binding.serializeCreate(cmdstream); - + return sendPostRequest( endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + CONTROLS_COLLECTION), ResourceFormat.JSON, @@ -341,39 +340,39 @@ public CompletableFuture addControlStream(String systemId, ICommandStrea throw new IllegalStateException("Error initializing binding", e); } } - - + + public CompletableFuture> addControlStreams(String systemId, ICommandStreamInfo... cmdstreams) { return addControlStreams(systemId, Arrays.asList(cmdstreams)); } - - + + public CompletableFuture> addControlStreams(String systemId, Collection cmdstreams) { try { var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - + var binding = new CommandStreamBindingJson(ctx, null, null, false) { protected void startJsonCollection(JsonWriter writer) throws IOException { writer.beginArray(); } - + protected void endJsonCollection(JsonWriter writer, Collection links) throws IOException { writer.endArray(); writer.flush(); } }; - + binding.startCollection(); for (var ds: cmdstreams) binding.serializeCreate(ds); binding.endCollection(Collections.emptyList()); - + return sendBatchPostRequest( endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + CONTROLS_COLLECTION), ResourceFormat.JSON, @@ -384,8 +383,8 @@ protected void endJsonCollection(JsonWriter writer, Collection lin throw new IllegalStateException("Error initializing binding", e); } } - - + + /*--------------*/ /* Observations */ /*--------------*/ @@ -394,19 +393,23 @@ public CompletableFuture pushObs(String dataStreamId, IDataStreamInfo da { try { + ObsHandler.ObsHandlerContextData contextData = new ObsHandler.ObsHandlerContextData(); + contextData.dsInfo = dataStream; + var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); if(dataStream != null && dataStream.getRecordEncoding() instanceof BinaryEncoding) { + ctx.setData(contextData); + ctx.setFormat(ResourceFormat.SWE_BINARY); var binding = new ObsBindingSweCommon(ctx, null, false, obsStore); binding.serialize(null, obs, false); - ctx.setFormat(ResourceFormat.SWE_BINARY); } else { + ctx.setFormat(ResourceFormat.OM_JSON); var binding = new ObsBindingOmJson(ctx, null, false, obsStore); binding.serializeCreate(obs); - ctx.setFormat(ResourceFormat.OM_JSON); } - + return sendPostRequest( endpoint.resolve(DATASTREAMS_COLLECTION + "/" + dataStreamId + "/" + OBSERVATIONS_COLLECTION), ctx.getFormat(), @@ -417,18 +420,18 @@ public CompletableFuture pushObs(String dataStreamId, IDataStreamInfo da throw new IllegalStateException("Error initializing binding", e); } } - - + + /*----------*/ /* Commands */ /*----------*/ - + public CompletableFuture sendCommand(String controlId, ICommandData cmd) { return null; } - - + + protected CompletableFuture sendGetRequest(URI collectionUri, ResourceFormat format, Function bodyMapper) { var req = HttpRequest.newBuilder() @@ -436,7 +439,7 @@ protected CompletableFuture sendGetRequest(URI collectionUri, ResourceFor .GET() .header(HttpHeaders.ACCEPT, format.getMimeType()) .build(); - + var bodyHandler = new BodyHandler() { @Override public BodySubscriber apply(ResponseInfo resp) @@ -450,7 +453,7 @@ public BodySubscriber apply(ResponseInfo resp) }); } }; - + return http.sendAsync(req, bodyHandler) .thenApply(resp -> { if (resp.statusCode() == 200) @@ -459,17 +462,16 @@ public BodySubscriber apply(ResponseInfo resp) throw new CompletionException("HTTP error " + resp.statusCode(), null); }); } - - + protected CompletableFuture sendPostRequest(URI collectionUri, ResourceFormat format, InMemoryBufferStreamHandler body) { var req = HttpRequest.newBuilder() .uri(collectionUri) - .POST(HttpRequest.BodyPublishers.ofInputStream(() -> body.getAsInputStream())) + .POST(HttpRequest.BodyPublishers.ofInputStream(body::getAsInputStream)) .header(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()) .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()) .build(); - + return http.sendAsync(req, BodyHandlers.ofString()) .thenApply(resp -> { if (resp.statusCode() == 201 || resp.statusCode() == 303) @@ -496,8 +498,8 @@ protected CompletableFuture sendPutRequest(URI collectionUri, ResourceF return http.sendAsync(req, BodyHandlers.ofString()) .thenApply(HttpResponse::statusCode); } - - + + protected CompletableFuture> sendBatchPostRequest(URI collectionUri, ResourceFormat format, InMemoryBufferStreamHandler body) { var req = HttpRequest.newBuilder() @@ -505,7 +507,7 @@ protected CompletableFuture> sendBatchPostRequest(URI collectionUri, .POST(HttpRequest.BodyPublishers.ofInputStream(() -> body.getAsInputStream())) .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()) .build(); - + return http.sendAsync(req, BodyHandlers.ofString()) .thenApply(Lambdas.checked(resp -> { if (resp.statusCode() == 201 || resp.statusCode() == 303) @@ -527,27 +529,27 @@ protected CompletableFuture> sendBatchPostRequest(URI collectionUri, throw new ResourceParseException(resp.body()); })); } - - + + /* Builder stuff */ - + public static ConSysApiClientBuilder newBuilder(String endpoint) { Asserts.checkNotNull(endpoint, "endpoint"); return new ConSysApiClientBuilder(endpoint); } - - + + public static class ConSysApiClientBuilder extends BaseBuilder { HttpClient.Builder httpClientBuilder; - - + + ConSysApiClientBuilder(String endpoint) { this.instance = new ConSysApiClient(); this.httpClientBuilder = HttpClient.newBuilder(); - + try { if (!endpoint.endsWith("/")) @@ -559,15 +561,15 @@ public static class ConSysApiClientBuilder extends BaseBuilder throw new IllegalArgumentException("Invalid URI " + endpoint); } } - - + + public ConSysApiClientBuilder useHttpClient(HttpClient http) { instance.http = http; return this; } - - + + public ConSysApiClientBuilder simpleAuth(String user, char[] password) { if (!Strings.isNullOrEmpty(user)) @@ -580,16 +582,16 @@ protected PasswordAuthentication getPasswordAuthentication() { } }); } - + return this; } - - + + public ConSysApiClient build() { if (instance.http == null) instance.http = httpClientBuilder.build(); - + return instance; } } diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/resource/RequestContext.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/resource/RequestContext.java index 3275131ea..0012fc700 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/resource/RequestContext.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/resource/RequestContext.java @@ -14,6 +14,7 @@ package org.sensorhub.impl.service.consys.resource; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -299,7 +300,7 @@ public Object getData() { return data; } - + public void setData(Object data) { @@ -432,7 +433,7 @@ public StreamHandler getStreamHandler() public InputStream getInputStream() throws IOException { - return req != null ? req.getInputStream() : inputStream; + return req != null ? new BufferedInputStream(req.getInputStream()) : inputStream; } From 65f0c26ff45608db6ba534167a96a8638719a337 Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:39:15 -0500 Subject: [PATCH 12/15] Remove conn config httpclient already tries persistent connection --- .../consys/client/ConSysApiClient.java | 1 - .../consys/client/ConSysApiClientConfig.java | 32 +++++++++---------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index cbbc10dbf..dcd262b07 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -41,7 +41,6 @@ import org.sensorhub.api.datastore.obs.IObsStore; import org.sensorhub.api.system.ISystemWithDesc; import org.sensorhub.impl.service.consys.ResourceParseException; -import org.sensorhub.impl.service.consys.SWECommonUtils; import org.sensorhub.impl.service.consys.obs.DataStreamBindingJson; import org.sensorhub.impl.service.consys.obs.ObsBindingOmJson; import org.sensorhub.impl.service.consys.obs.ObsBindingSweCommon; diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java index 86cdc091d..8dc01a237 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java @@ -18,22 +18,22 @@ public class ConSysApiClientConfig extends ClientConfig { @DisplayInfo(label="Connection Options") - public ConSysConnectionConfig connection = new ConSysConnectionConfig(); - - - public static class ConSysConnectionConfig extends RobustIPConnectionConfig - { - @DisplayInfo(desc="Enable to use a persistent HTTP connection for InsertResult") - public boolean usePersistentConnection; - - - @DisplayInfo(desc="Maximum number of records in upload queue (used to compensate for variable bandwidth)") - public int maxQueueSize = 10; - - - @DisplayInfo(desc="Maximum number of stream errors before we try to reconnect to remote server") - public int maxConnectErrors = 10; - } + public RobustIPConnectionConfig connection = new RobustIPConnectionConfig(); + + +// public static class ConSysConnectionConfig extends RobustIPConnectionConfig +// { +// @DisplayInfo(desc="Enable to use a persistent HTTP connection for InsertResult") +// public boolean usePersistentConnection; +// +// +// @DisplayInfo(desc="Maximum number of records in upload queue (used to compensate for variable bandwidth)") +// public int maxQueueSize = 10; +// +// +// @DisplayInfo(desc="Maximum number of stream errors before we try to reconnect to remote server") +// public int maxConnectErrors = 10; +// } public ConSysApiClientConfig() From 7a643fc8f7b92ca91d71b888ee088f417822e95b Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Wed, 2 Oct 2024 19:55:55 -0500 Subject: [PATCH 13/15] Add system event handling --- .../consys/client/ConSysApiClientModule.java | 258 ++++++++++++++---- 1 file changed, 199 insertions(+), 59 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java index d644e5927..6bd526dc4 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -1,29 +1,30 @@ package org.sensorhub.impl.service.consys.client; import com.google.common.base.Strings; -import org.checkerframework.checker.units.qual.C; -import org.eclipse.jetty.client.HttpResponse; import org.sensorhub.api.client.ClientException; import org.sensorhub.api.client.IClientModule; import org.sensorhub.api.common.BigId; import org.sensorhub.api.common.SensorHubException; -import org.sensorhub.api.data.DataStreamInfo; -import org.sensorhub.api.data.IDataStreamInfo; -import org.sensorhub.api.data.ObsEvent; +import org.sensorhub.api.data.*; import org.sensorhub.api.database.IObsSystemDatabase; import org.sensorhub.api.datastore.obs.DataStreamFilter; import org.sensorhub.api.datastore.obs.ObsFilter; import org.sensorhub.api.datastore.system.SystemFilter; import org.sensorhub.api.event.EventUtils; import org.sensorhub.api.system.ISystemWithDesc; +import org.sensorhub.api.system.SystemAddedEvent; +import org.sensorhub.api.system.SystemChangedEvent; +import org.sensorhub.api.system.SystemEnabledEvent; +import org.sensorhub.api.system.SystemRemovedEvent; +import org.sensorhub.api.system.SystemDisabledEvent; +import org.sensorhub.api.system.SystemEvent; import org.sensorhub.impl.module.AbstractModule; -import org.sensorhub.impl.module.RobustConnection; import org.sensorhub.impl.service.consys.resource.ResourceFormat; -import org.vast.cdm.common.DataStreamWriter; -import org.vast.swe.SWEData; import org.vast.util.Asserts; -import java.net.*; +import java.net.Authenticator; +import java.net.HttpURLConnection; +import java.net.PasswordAuthentication; import java.util.Map; import java.util.NavigableMap; import java.util.Objects; @@ -31,17 +32,18 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Flow; +import java.util.concurrent.CompletableFuture; public class ConSysApiClientModule extends AbstractModule implements IClientModule { - RobustConnection connection; IObsSystemDatabase dataBaseView; String apiEndpointUrl; ConSysApiClient client; Map registeredSystems; NavigableMap dataStreams; + Flow.Subscription registrySubscription; - static class SystemRegInfo + public static class SystemRegInfo { private String systemID; private BigId internalID; @@ -49,19 +51,14 @@ static class SystemRegInfo private ISystemWithDesc system; } - static class StreamInfo + public static class StreamInfo { private IDataStreamInfo dataStream; private String dataStreamID; private String topicID; private BigId internalID; private String sysUID; - private String outputName; private Flow.Subscription subscription; - private HttpURLConnection connection; - private DataStreamWriter persistentWriter; - private volatile boolean connecting = false; - private volatile boolean stopping = false; } public ConSysApiClientModule() @@ -88,16 +85,9 @@ public void setConfiguration(ConSysApiClientConfig config) } } - protected void checkConfiguration() throws SensorHubException - { - // TODO check config - } - @Override protected void doInit() throws SensorHubException { - checkConfiguration(); - this.dataBaseView = config.dataSourceSelector.getFilteredView(getParentHub()); this.client = ConSysApiClient. @@ -139,12 +129,10 @@ public PasswordAuthentication getPasswordAuthentication() { registerSystemDataStreams(systemRegInfo); }); + subscribeToRegistryEvents(); + for (var stream : dataStreams.values()) startStream(stream); - - // TODO: Subscribe to system registry for system events - - // TODO: Include option to push using persistent HTTP connection } @Override @@ -155,6 +143,18 @@ protected void doStop() throws SensorHubException { stopStream(stream); } + protected void subscribeToRegistryEvents() + { + getParentHub().getEventBus().newSubscription(SystemEvent.class) + .withTopicID(EventUtils.getSystemRegistryTopicID()) + .withEventType(SystemEvent.class) + .subscribe(this::handleEvent) + .thenAccept(sub -> { + registrySubscription = sub; + sub.request(Long.MAX_VALUE); + }); + } + protected void checkSubSystems(SystemRegInfo parentSystemRegInfo) { dataBaseView.getSystemDescStore().selectEntries( @@ -167,7 +167,7 @@ protected void checkSubSystems(SystemRegInfo parentSystemRegInfo) }); } - protected SystemRegInfo registerSystem(BigId systemInternalID, ISystemWithDesc system) + private String tryUpdateSystem(ISystemWithDesc system) { try { var uidRequest = client.getSystemByUid(system.getUniqueIdentifier(), ResourceFormat.JSON); @@ -176,17 +176,47 @@ protected SystemRegInfo registerSystem(BigId systemInternalID, ISystemWithDesc s var oldSys = uidRequest.get(); systemID = oldSys.getId(); var responseCode = client.updateSystem(systemID, system).get(); - if(responseCode != 204) - throw new ClientException("There was a problem updating resource: " + apiEndpointUrl + ConSysApiClient.SYSTEMS_COLLECTION + "/" + systemID); - } else + boolean successful = responseCode == 204; + if(!successful) + throw new ClientException("Failed to update resource: " + apiEndpointUrl + ConSysApiClient.SYSTEMS_COLLECTION + "/" + systemID); + return systemID; + } + } catch (ExecutionException | InterruptedException | ClientException e) { + throw new RuntimeException(e); + } + return null; + } + + private void disableSystem(String uid, boolean remove) + { + var sysInfo = remove ? registeredSystems.remove(uid) : registeredSystems.get(uid); + if(sysInfo != null) + { + if(sysInfo.subscription != null) + { + sysInfo.subscription.cancel(); + sysInfo.subscription = null; + getLogger().debug("Unsubscribed from system {}", uid); + } + + var sysDataStreams = dataStreams.subMap(uid, uid + "\uffff"); + for(var streamInfo : sysDataStreams.values()) + stopStream(streamInfo); + + if(remove) + getLogger().info("Removed system {}", uid); + } + } + + protected SystemRegInfo registerSystem(BigId systemInternalID, ISystemWithDesc system) + { + try { + String systemID = tryUpdateSystem(system); + if(systemID == null) systemID = client.addSystem(system).get(); - SystemRegInfo systemRegInfo = new SystemRegInfo(); - systemRegInfo.systemID = systemID; - systemRegInfo.internalID = systemInternalID; - systemRegInfo.system = system; - return systemRegInfo; - } catch (InterruptedException | ExecutionException | ClientException e) { + return registerSystemInfo(systemID, systemInternalID, system); + } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } @@ -199,27 +229,36 @@ protected SystemRegInfo registerSubSystem(BigId systemInternalID, SystemRegInfo if(parent == null) throw new ClientException("Could not retrieve parent system " + parentSystem.systemID); - var uidRequest = client.getSystemByUid(system.getUniqueIdentifier(), ResourceFormat.JSON); - String systemID; - if(uidRequest != null) { - var oldSys = uidRequest.get(); - systemID = oldSys.getId(); - var responseCode = client.updateSystem(systemID, system).get(); - if(responseCode != 204) - throw new ClientException("There was a problem updating resource: " + apiEndpointUrl + ConSysApiClient.SYSTEMS_COLLECTION + "/" + systemID); - } else + String systemID = tryUpdateSystem(system); + if(systemID == null) systemID = client.addSubSystem(parentSystem.systemID, system).get(); - SystemRegInfo systemRegInfo = new SystemRegInfo(); - systemRegInfo.systemID = systemID; - systemRegInfo.internalID = systemInternalID; - systemRegInfo.system = system; - return systemRegInfo; + return registerSystemInfo(systemID, systemInternalID, system); } catch (InterruptedException | ExecutionException | ClientException e) { throw new RuntimeException(e); } } + private SystemRegInfo registerSystemInfo(String systemID, BigId systemInternalID, ISystemWithDesc system) + { + SystemRegInfo systemRegInfo = new SystemRegInfo(); + systemRegInfo.systemID = systemID; + systemRegInfo.internalID = systemInternalID; + systemRegInfo.system = system; + + getParentHub().getEventBus().newSubscription(SystemEvent.class) + .withTopicID(EventUtils.getSystemStatusTopicID(system.getUniqueIdentifier())) + .withEventType(SystemEvent.class) + .subscribe(this::handleEvent) + .thenAccept(sub -> { + systemRegInfo.subscription = sub; + sub.request(Long.MAX_VALUE); + }); + + registeredSystems.put(system.getUniqueIdentifier(), systemRegInfo); + return systemRegInfo; + } + protected void registerSystemDataStreams(SystemRegInfo system) { dataBaseView.getDataStreamStore().selectEntries( @@ -230,10 +269,7 @@ protected void registerSystemDataStreams(SystemRegInfo system) .build()) .forEach((entry) -> { if(Objects.equals(entry.getValue().getSystemID().getUniqueID(), system.system.getUniqueIdentifier())) - { - var streamInfo = registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue()); - dataStreams.put(streamInfo.dataStreamID, streamInfo); - } + registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue()); }); } @@ -250,13 +286,46 @@ protected StreamInfo registerDataStream(BigId dsId, String systemID, IDataStream streamInfo.dataStream = dataStream; streamInfo.topicID = dsTopicId; - streamInfo.outputName = dataStream.getOutputName(); streamInfo.sysUID = dataStream.getSystemID().getUniqueID(); streamInfo.internalID = dsId; + dataStreams.put(dsTopicId, streamInfo); return streamInfo; } + protected void addAndStartStream(String uid, String outputName) + { + try + { + var sysInfo = registeredSystems.get(uid); + var dsEntry = dataBaseView.getDataStreamStore().getLatestVersionEntry(uid, outputName); + if(sysInfo != null && dsEntry != null) + { + var dsId = dsEntry.getKey().getInternalID(); + var dsInfo = dsEntry.getValue(); + + var streamInfo = registerDataStream(dsId, sysInfo.systemID, dsInfo); + startStream(streamInfo); + } + } + catch (ClientException e) + { + reportError(e.getMessage(), e.getCause()); + } + } + + protected void disableDataStream(String uid, String outputName, boolean remove) + { + var dsSourceId = EventUtils.getDataStreamDataTopicID(uid, outputName); + var streamInfo = remove ? dataStreams.remove(dsSourceId) : dataStreams.get(dsSourceId); + if(streamInfo != null) + { + stopStream(streamInfo); + if(remove) + getLogger().info("Removed datastream {}", dsSourceId); + } + } + protected synchronized void startStream(StreamInfo streamInfo) throws ClientException { try @@ -289,7 +358,7 @@ protected synchronized void startStream(StreamInfo streamInfo) throws ClientExce } } - protected void stopStream(StreamInfo streamInfo) throws ClientException + protected void stopStream(StreamInfo streamInfo) { if(streamInfo.subscription != null) { @@ -297,7 +366,7 @@ protected void stopStream(StreamInfo streamInfo) throws ClientException streamInfo.subscription = null; } - + // TODO Check other stuff } @Override @@ -312,4 +381,75 @@ protected void handleEvent(final ObsEvent e, StreamInfo streamInfo) client.pushObs(streamInfo.dataStreamID, streamInfo.dataStream, obs, this.dataBaseView.getObservationStore()); } + protected void handleEvent(final SystemEvent e) + { + // sensor description updated + if (e instanceof SystemChangedEvent) + { + CompletableFuture.runAsync(() -> { + var system = dataBaseView.getSystemDescStore().getCurrentVersion(e.getSystemUID()); + if(system != null) + tryUpdateSystem(system); + }); + } + + // system events + else if (e instanceof SystemAddedEvent || e instanceof SystemEnabledEvent) + { + CompletableFuture.runAsync(() -> { + var system = dataBaseView.getSystemDescStore().getCurrentVersionEntry(e.getSystemUID()); + if(system != null) + { + var systemRegInfo = registerSystem(system.getKey().getInternalID(), system.getValue()); + checkSubSystems(systemRegInfo); + registerSystemDataStreams(systemRegInfo); + } + }); + } + + else if (e instanceof SystemDisabledEvent) + { + CompletableFuture.runAsync(() -> { + var sysUID = e.getSystemUID(); + disableSystem(sysUID, false); + }); + } + + else if (e instanceof SystemRemovedEvent) + { + CompletableFuture.runAsync(() -> { + var sysUID = e.getSystemUID(); + disableSystem(sysUID, true); + }); + } + + // datastream events + else if (e instanceof DataStreamAddedEvent || e instanceof DataStreamEnabledEvent) + { + CompletableFuture.runAsync(() -> { + var sysUID = e.getSystemUID(); + var outputName = ((DataStreamEvent) e).getOutputName(); + addAndStartStream(sysUID, outputName); + }); + } + + else if (e instanceof DataStreamDisabledEvent) + { + CompletableFuture.runAsync(() -> { + var sysUID = e.getSystemUID(); + var outputName = ((DataStreamEvent) e).getOutputName(); + disableDataStream(sysUID, outputName, false); + }); + } + + else if (e instanceof DataStreamRemovedEvent) + { + CompletableFuture.runAsync(() -> { + var sysUID = e.getSystemUID(); + var outputName = ((DataStreamEvent) e).getOutputName(); + disableDataStream(sysUID, outputName, true); + }); + } + } + } From 818a62ba9a21ddc24575ae805ac5acd56dd9c0bb Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Wed, 2 Oct 2024 21:58:09 -0500 Subject: [PATCH 14/15] start newly added datastreams --- .../consys/client/ConSysApiClientModule.java | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java index 6bd526dc4..a24ed4799 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -25,9 +25,7 @@ import java.net.Authenticator; import java.net.HttpURLConnection; import java.net.PasswordAuthentication; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Objects; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; @@ -259,8 +257,10 @@ private SystemRegInfo registerSystemInfo(String systemID, BigId systemInternalID return systemRegInfo; } - protected void registerSystemDataStreams(SystemRegInfo system) + protected List registerSystemDataStreams(SystemRegInfo system) { + List addedStreams = new ArrayList<>(); + dataBaseView.getDataStreamStore().selectEntries( new DataStreamFilter.Builder() .withSystems(new SystemFilter.Builder() @@ -269,8 +269,9 @@ protected void registerSystemDataStreams(SystemRegInfo system) .build()) .forEach((entry) -> { if(Objects.equals(entry.getValue().getSystemID().getUniqueID(), system.system.getUniqueIdentifier())) - registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue()); + addedStreams.add(registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue())); }); + return addedStreams; } protected StreamInfo registerDataStream(BigId dsId, String systemID, IDataStreamInfo dataStream) @@ -377,6 +378,7 @@ public boolean isConnected() protected void handleEvent(final ObsEvent e, StreamInfo streamInfo) { + var length = e.getObservations().length; for(var obs : e.getObservations()) client.pushObs(streamInfo.dataStreamID, streamInfo.dataStream, obs, this.dataBaseView.getObservationStore()); } @@ -402,7 +404,14 @@ else if (e instanceof SystemAddedEvent || e instanceof SystemEnabledEvent) { var systemRegInfo = registerSystem(system.getKey().getInternalID(), system.getValue()); checkSubSystems(systemRegInfo); - registerSystemDataStreams(systemRegInfo); + var newStreams = registerSystemDataStreams(systemRegInfo); + for(var streamInfo : newStreams) { + try { + startStream(streamInfo); + } catch (ClientException ex) { + throw new RuntimeException(ex); + } + } } }); } From 8d77d66b91b7636a2e44887a683f82caff0d078a Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Sat, 5 Oct 2024 15:55:28 -0500 Subject: [PATCH 15/15] Update ConSysApiClient.java --- .../sensorhub/impl/service/consys/client/ConSysApiClient.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index d4dc52fde..a5fa6d4ce 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -18,6 +18,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.io.StringReader; import java.net.Authenticator; @@ -41,6 +42,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.function.Function; +import java.util.Objects; import com.google.gson.JsonArray; import com.google.gson.JsonObject;