diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index 2dd66a9d1..a5fa6d4ce 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -7,9 +7,9 @@ Software distributed under the License is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. - + Copyright (C) 2023 Sensia Software LLC. All Rights Reserved. - + ******************************* END LICENSE BLOCK ***************************/ package org.sensorhub.impl.service.consys.client; @@ -18,6 +18,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.io.StringReader; import java.net.Authenticator; @@ -26,6 +27,7 @@ import java.net.URISyntaxException; import java.net.http.HttpClient; import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodyHandlers; import java.net.http.HttpResponse.BodySubscriber; @@ -38,7 +40,14 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.function.Function; +import java.util.Objects; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import net.opengis.swe.v20.BinaryEncoding; import org.sensorhub.api.command.ICommandData; import org.sensorhub.api.command.ICommandStreamInfo; import org.sensorhub.api.data.IDataStreamInfo; @@ -51,6 +60,13 @@ import org.sensorhub.impl.service.consys.procedure.ProcedureBindingGeoJson; import org.sensorhub.impl.service.consys.procedure.ProcedureBindingSmlJson; import org.sensorhub.impl.service.consys.property.PropertyBindingJson; +import org.sensorhub.api.datastore.obs.IObsStore; +import org.sensorhub.api.system.ISystemWithDesc; +import org.sensorhub.impl.service.consys.ResourceParseException; +import org.sensorhub.impl.service.consys.obs.DataStreamBindingJson; +import org.sensorhub.impl.service.consys.obs.ObsBindingOmJson; +import org.sensorhub.impl.service.consys.obs.ObsBindingSweCommon; +import org.sensorhub.impl.service.consys.obs.ObsHandler; import org.sensorhub.impl.service.consys.resource.RequestContext; import org.sensorhub.impl.service.consys.resource.ResourceFormat; import org.sensorhub.impl.service.consys.resource.ResourceLink; @@ -75,16 +91,18 @@ public class ConSysApiClient static final String DEPLOYMENTS_COLLECTION = "deployments"; static final String DATASTREAMS_COLLECTION = "datastreams"; static final String CONTROLS_COLLECTION = "controls"; + static final String OBSERVATIONS_COLLECTION = "observations"; + static final String SUBSYSTEMS_COLLECTION = "subsystems"; static final String SF_COLLECTION = "fois"; - + HttpClient http; URI endpoint; - - + + static class InMemoryBufferStreamHandler implements StreamHandler { ByteArrayOutputStream os = new ByteArrayOutputStream(); - + public void setStartCallback(Runnable onStart) {} public void setCloseCallback(Runnable onClose) {} public void sendPacket() throws IOException {} @@ -92,8 +110,8 @@ public void close() {} public OutputStream getOutputStream() { return os; } public InputStream getAsInputStream() { return new ByteArrayInputStream(os.toByteArray()); } } - - + + protected ConSysApiClient() {} @@ -317,7 +335,7 @@ protected void endJsonCollection(JsonWriter writer, Collection lin /*---------*/ /* Systems */ /*---------*/ - + public CompletableFuture getSystemById(String id, ResourceFormat format) { return sendGetRequest(endpoint.resolve(SYSTEMS_COLLECTION + "/" + id), format, body -> { @@ -334,36 +352,46 @@ public CompletableFuture getSystemById(String id, ResourceForma } }); } - - - public CompletableFuture getSystemByUid(String uid, ResourceFormat format) - { - return sendGetRequest(endpoint.resolve(SYSTEMS_COLLECTION + "?uid=" + uid), format, body -> { - try - { + + // TODO Needs to parse top feature from FeatureCollection, instead of trying to parse FeatureCollection as ISystemWithDesc + public CompletableFuture getSystemByUid(String uid, ResourceFormat format) throws ExecutionException, InterruptedException { + var searchUID = sendGetRequest(endpoint.resolve(SYSTEMS_COLLECTION + "?uid=" + uid), format, body -> { + try { var ctx = new RequestContext(body); - var binding = new SystemBindingGeoJson(ctx, null, null, true); - return binding.deserialize(); - } - catch (IOException e) - { + + JsonObject bodyJson = JsonParser.parseReader(new InputStreamReader(ctx.getInputStream())).getAsJsonObject(); + JsonArray features = bodyJson.getAsJsonArray("items"); + + if(features != null && !features.isEmpty()) { + JsonObject firstFeature = features.get(0).getAsJsonObject(); + String featureID = firstFeature.get("id").getAsString(); + + return featureID; + } else { + return ""; + } + } catch (IOException e) { e.printStackTrace(); throw new CompletionException(e); } }); + var id = searchUID.get(); + if (Objects.equals(id, "")) + return null; + return getSystemById(id, format); } - - + + public CompletableFuture addSystem(ISystemWithDesc system) { try { var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - + var binding = new SystemBindingSmlJson(ctx, null, false); binding.serialize(null, system, false); - + return sendPostRequest( endpoint.resolve(SYSTEMS_COLLECTION), ResourceFormat.SML_JSON, @@ -374,39 +402,80 @@ public CompletableFuture addSystem(ISystemWithDesc system) throw new IllegalStateException("Error initializing binding", e); } } - - + + public CompletableFuture updateSystem(String systemID, ISystemWithDesc system) + { + try + { + var buffer = new InMemoryBufferStreamHandler(); + var ctx = new RequestContext(buffer); + + var binding = new SystemBindingSmlJson(ctx, null, false); + binding.serialize(null, system, false); + + return sendPutRequest( + endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemID), + ResourceFormat.SML_JSON, + buffer); + } + catch (IOException e) + { + throw new IllegalStateException("Error initializing binding", e); + } + } + + public CompletableFuture addSubSystem(String systemID, ISystemWithDesc system) + { + try + { + var buffer = new InMemoryBufferStreamHandler(); + var ctx = new RequestContext(buffer); + + var binding = new SystemBindingSmlJson(ctx, null, false); + binding.serialize(null, system, false); + + return sendPostRequest( + endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemID + "/" + SUBSYSTEMS_COLLECTION), + ResourceFormat.SML_JSON, + buffer); + } + catch (IOException e) + { + throw new IllegalStateException("Error initializing binding", e); + } + } + public CompletableFuture> addSystems(ISystemWithDesc... systems) { return addSystems(Arrays.asList(systems)); } - - + + public CompletableFuture> addSystems(Collection systems) { try { var buffer = new InMemoryBufferStreamHandler(); var ctx = new RequestContext(buffer); - + var binding = new SystemBindingSmlJson(ctx, null, false) { protected void startJsonCollection(JsonWriter writer) throws IOException { writer.beginArray(); } - + protected void endJsonCollection(JsonWriter writer, Collection links) throws IOException { writer.endArray(); writer.flush(); } }; - + binding.startCollection(); for (var sys: systems) binding.serialize(null, sys, false); binding.endCollection(Collections.emptyList()); - + return sendBatchPostRequest( endpoint.resolve(SYSTEMS_COLLECTION), ResourceFormat.SML_JSON, @@ -417,12 +486,12 @@ protected void endJsonCollection(JsonWriter writer, Collection lin throw new IllegalStateException("Error initializing binding", e); } } - - + + /*-------------*/ /* Datastreams */ /*-------------*/ - + public CompletableFuture addDataStream(String systemId, IDataStreamInfo datastream) { try @@ -432,7 +501,7 @@ public CompletableFuture addDataStream(String systemId, IDataStreamInfo var binding = new DataStreamBindingJson(ctx, null, null, false, Collections.emptyMap()); binding.serializeCreate(datastream); - + return sendPostRequest( endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + DATASTREAMS_COLLECTION), ResourceFormat.JSON, @@ -443,14 +512,14 @@ public CompletableFuture addDataStream(String systemId, IDataStreamInfo throw new IllegalStateException("Error initializing binding", e); } } - - + + public CompletableFuture> addDataStreams(String systemId, IDataStreamInfo... datastreams) { return addDataStreams(systemId, Arrays.asList(datastreams)); } - - + + public CompletableFuture> addDataStreams(String systemId, Collection datastreams) { try @@ -463,19 +532,19 @@ protected void startJsonCollection(JsonWriter writer) throws IOException { writer.beginArray(); } - + protected void endJsonCollection(JsonWriter writer, Collection links) throws IOException { writer.endArray(); writer.flush(); } }; - + binding.startCollection(); for (var ds: datastreams) binding.serializeCreate(ds); binding.endCollection(Collections.emptyList()); - + return sendBatchPostRequest( endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + DATASTREAMS_COLLECTION), ResourceFormat.JSON, @@ -486,12 +555,12 @@ protected void endJsonCollection(JsonWriter writer, Collection lin throw new IllegalStateException("Error initializing binding", e); } } - - + + /*-----------------*/ /* Control Streams */ /*-----------------*/ - + public CompletableFuture addControlStream(String systemId, ICommandStreamInfo cmdstream) { try @@ -501,7 +570,7 @@ public CompletableFuture addControlStream(String systemId, ICommandStrea var binding = new CommandStreamBindingJson(ctx, null, null, false); binding.serializeCreate(cmdstream); - + return sendPostRequest( endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + CONTROLS_COLLECTION), ResourceFormat.JSON, @@ -512,14 +581,14 @@ public CompletableFuture addControlStream(String systemId, ICommandStrea throw new IllegalStateException("Error initializing binding", e); } } - - + + public CompletableFuture> addControlStreams(String systemId, ICommandStreamInfo... cmdstreams) { return addControlStreams(systemId, Arrays.asList(cmdstreams)); } - - + + public CompletableFuture> addControlStreams(String systemId, Collection cmdstreams) { try @@ -532,19 +601,19 @@ protected void startJsonCollection(JsonWriter writer) throws IOException { writer.beginArray(); } - + protected void endJsonCollection(JsonWriter writer, Collection links) throws IOException { writer.endArray(); writer.flush(); } }; - + binding.startCollection(); for (var ds: cmdstreams) binding.serializeCreate(ds); binding.endCollection(Collections.emptyList()); - + return sendBatchPostRequest( endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + CONTROLS_COLLECTION), ResourceFormat.JSON, @@ -555,28 +624,55 @@ protected void endJsonCollection(JsonWriter writer, Collection lin throw new IllegalStateException("Error initializing binding", e); } } - - + + /*--------------*/ /* Observations */ /*--------------*/ - - public CompletableFuture pushObs(String datastreamId, IObsData cmd) + // TODO: Be able to push different kinds of observations such as video + public CompletableFuture pushObs(String dataStreamId, IDataStreamInfo dataStream, IObsData obs, IObsStore obsStore) { - return null; + try + { + ObsHandler.ObsHandlerContextData contextData = new ObsHandler.ObsHandlerContextData(); + contextData.dsInfo = dataStream; + + var buffer = new InMemoryBufferStreamHandler(); + var ctx = new RequestContext(buffer); + + if(dataStream != null && dataStream.getRecordEncoding() instanceof BinaryEncoding) { + ctx.setData(contextData); + ctx.setFormat(ResourceFormat.SWE_BINARY); + var binding = new ObsBindingSweCommon(ctx, null, false, obsStore); + binding.serialize(null, obs, false); + } else { + ctx.setFormat(ResourceFormat.OM_JSON); + var binding = new ObsBindingOmJson(ctx, null, false, obsStore); + binding.serializeCreate(obs); + } + + return sendPostRequest( + endpoint.resolve(DATASTREAMS_COLLECTION + "/" + dataStreamId + "/" + OBSERVATIONS_COLLECTION), + ctx.getFormat(), + buffer); + } + catch (IOException e) + { + throw new IllegalStateException("Error initializing binding", e); + } } - - + + /*----------*/ /* Commands */ /*----------*/ - + public CompletableFuture sendCommand(String controlId, ICommandData cmd) { return null; } - - + + protected CompletableFuture sendGetRequest(URI collectionUri, ResourceFormat format, Function bodyMapper) { var req = HttpRequest.newBuilder() @@ -584,7 +680,7 @@ protected CompletableFuture sendGetRequest(URI collectionUri, ResourceFor .GET() .header(HttpHeaders.ACCEPT, format.getMimeType()) .build(); - + var bodyHandler = new BodyHandler() { @Override public BodySubscriber apply(ResponseInfo resp) @@ -598,7 +694,7 @@ public BodySubscriber apply(ResponseInfo resp) }); } }; - + return http.sendAsync(req, bodyHandler) .thenApply(resp -> { if (resp.statusCode() == 200) @@ -607,17 +703,16 @@ public BodySubscriber apply(ResponseInfo resp) throw new CompletionException("HTTP error " + resp.statusCode(), null); }); } - - + protected CompletableFuture sendPostRequest(URI collectionUri, ResourceFormat format, InMemoryBufferStreamHandler body) { var req = HttpRequest.newBuilder() .uri(collectionUri) - .POST(HttpRequest.BodyPublishers.ofInputStream(() -> body.getAsInputStream())) + .POST(HttpRequest.BodyPublishers.ofInputStream(body::getAsInputStream)) .header(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()) .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()) .build(); - + return http.sendAsync(req, BodyHandlers.ofString()) .thenApply(resp -> { if (resp.statusCode() == 201 || resp.statusCode() == 303) @@ -631,8 +726,21 @@ protected CompletableFuture sendPostRequest(URI collectionUri, ResourceF throw new CompletionException(resp.body(), null); }); } - - + + protected CompletableFuture sendPutRequest(URI collectionUri, ResourceFormat format, InMemoryBufferStreamHandler body) + { + var req = HttpRequest.newBuilder() + .uri(collectionUri) + .PUT(HttpRequest.BodyPublishers.ofInputStream(() -> body.getAsInputStream())) + .header(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()) + .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()) + .build(); + + return http.sendAsync(req, BodyHandlers.ofString()) + .thenApply(HttpResponse::statusCode); + } + + protected CompletableFuture> sendBatchPostRequest(URI collectionUri, ResourceFormat format, InMemoryBufferStreamHandler body) { var req = HttpRequest.newBuilder() @@ -640,7 +748,7 @@ protected CompletableFuture> sendBatchPostRequest(URI collectionUri, .POST(HttpRequest.BodyPublishers.ofInputStream(() -> body.getAsInputStream())) .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()) .build(); - + return http.sendAsync(req, BodyHandlers.ofString()) .thenApply(Lambdas.checked(resp -> { if (resp.statusCode() == 201 || resp.statusCode() == 303) @@ -662,27 +770,27 @@ protected CompletableFuture> sendBatchPostRequest(URI collectionUri, throw new ResourceParseException(resp.body()); })); } - - + + /* Builder stuff */ - + public static ConSysApiClientBuilder newBuilder(String endpoint) { Asserts.checkNotNull(endpoint, "endpoint"); return new ConSysApiClientBuilder(endpoint); } - - + + public static class ConSysApiClientBuilder extends BaseBuilder { HttpClient.Builder httpClientBuilder; - - + + ConSysApiClientBuilder(String endpoint) { this.instance = new ConSysApiClient(); this.httpClientBuilder = HttpClient.newBuilder(); - + try { if (!endpoint.endsWith("/")) @@ -694,15 +802,15 @@ public static class ConSysApiClientBuilder extends BaseBuilder throw new IllegalArgumentException("Invalid URI " + endpoint); } } - - + + public ConSysApiClientBuilder useHttpClient(HttpClient http) { instance.http = http; return this; } - - + + public ConSysApiClientBuilder simpleAuth(String user, char[] password) { if (!Strings.isNullOrEmpty(user)) @@ -715,16 +823,16 @@ protected PasswordAuthentication getPasswordAuthentication() { } }); } - + return this; } - - + + public ConSysApiClient build() { if (instance.http == null) instance.http = httpClientBuilder.build(); - + return instance; } } diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java new file mode 100644 index 000000000..8dc01a237 --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java @@ -0,0 +1,45 @@ +package org.sensorhub.impl.service.consys.client; + +import org.sensorhub.api.client.ClientConfig; +import org.sensorhub.api.config.DisplayInfo; +import org.sensorhub.impl.comm.HTTPConfig; +import org.sensorhub.impl.comm.RobustIPConnectionConfig; +import org.sensorhub.impl.datastore.view.ObsSystemDatabaseViewConfig; + +public class ConSysApiClientConfig extends ClientConfig { + + @DisplayInfo(desc="Filtered view to select systems/datastreams to register with Connected Systems") + @DisplayInfo.Required + public ObsSystemDatabaseViewConfig dataSourceSelector; + + + @DisplayInfo(label="Connected Systems Endpoint", desc="Connected Systems endpoint where the requests are sent") + public HTTPConfig conSys = new HTTPConfig(); + + + @DisplayInfo(label="Connection Options") + public RobustIPConnectionConfig connection = new RobustIPConnectionConfig(); + + +// public static class ConSysConnectionConfig extends RobustIPConnectionConfig +// { +// @DisplayInfo(desc="Enable to use a persistent HTTP connection for InsertResult") +// public boolean usePersistentConnection; +// +// +// @DisplayInfo(desc="Maximum number of records in upload queue (used to compensate for variable bandwidth)") +// public int maxQueueSize = 10; +// +// +// @DisplayInfo(desc="Maximum number of stream errors before we try to reconnect to remote server") +// public int maxConnectErrors = 10; +// } + + + public ConSysApiClientConfig() + { + this.moduleClass = ConSysApiClientModule.class.getCanonicalName(); + this.conSys.resourcePath = "/sensorhub/api"; + } + +} diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientDescriptor.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientDescriptor.java new file mode 100644 index 000000000..14397d48b --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientDescriptor.java @@ -0,0 +1,37 @@ +package org.sensorhub.impl.service.consys.client; + +import org.sensorhub.api.module.IModule; +import org.sensorhub.api.module.IModuleProvider; +import org.sensorhub.api.module.ModuleConfig; +import org.sensorhub.impl.module.JarModuleProvider; + +public class ConSysApiClientDescriptor extends JarModuleProvider implements IModuleProvider { + + @Override + public String getModuleName() + { + return "Connected Systems Client"; + } + + + @Override + public String getModuleDescription() + { + return "Connected Systems client for pushing observations to a remote SensorHub"; + } + + + @Override + public Class> getModuleClass() + { + return ConSysApiClientModule.class; + } + + + @Override + public Class getModuleConfigClass() + { + return ConSysApiClientConfig.class; + } + +} diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java new file mode 100644 index 000000000..a24ed4799 --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -0,0 +1,464 @@ +package org.sensorhub.impl.service.consys.client; + +import com.google.common.base.Strings; +import org.sensorhub.api.client.ClientException; +import org.sensorhub.api.client.IClientModule; +import org.sensorhub.api.common.BigId; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.api.data.*; +import org.sensorhub.api.database.IObsSystemDatabase; +import org.sensorhub.api.datastore.obs.DataStreamFilter; +import org.sensorhub.api.datastore.obs.ObsFilter; +import org.sensorhub.api.datastore.system.SystemFilter; +import org.sensorhub.api.event.EventUtils; +import org.sensorhub.api.system.ISystemWithDesc; +import org.sensorhub.api.system.SystemAddedEvent; +import org.sensorhub.api.system.SystemChangedEvent; +import org.sensorhub.api.system.SystemEnabledEvent; +import org.sensorhub.api.system.SystemRemovedEvent; +import org.sensorhub.api.system.SystemDisabledEvent; +import org.sensorhub.api.system.SystemEvent; +import org.sensorhub.impl.module.AbstractModule; +import org.sensorhub.impl.service.consys.resource.ResourceFormat; +import org.vast.util.Asserts; + +import java.net.Authenticator; +import java.net.HttpURLConnection; +import java.net.PasswordAuthentication; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Flow; +import java.util.concurrent.CompletableFuture; + +public class ConSysApiClientModule extends AbstractModule implements IClientModule { + + IObsSystemDatabase dataBaseView; + String apiEndpointUrl; + ConSysApiClient client; + Map registeredSystems; + NavigableMap dataStreams; + Flow.Subscription registrySubscription; + + public static class SystemRegInfo + { + private String systemID; + private BigId internalID; + private Flow.Subscription subscription; + private ISystemWithDesc system; + } + + public static class StreamInfo + { + private IDataStreamInfo dataStream; + private String dataStreamID; + private String topicID; + private BigId internalID; + private String sysUID; + private Flow.Subscription subscription; + } + + public ConSysApiClientModule() + { +// this.startAsync = true; + this.registeredSystems = new ConcurrentHashMap<>(); + this.dataStreams = new ConcurrentSkipListMap<>(); + } + + @Override + public void setConfiguration(ConSysApiClientConfig config) + { + super.setConfiguration(config); + + String scheme = "http"; + if (config.conSys.enableTLS) + scheme += "s"; + apiEndpointUrl = scheme + "://" + config.conSys.remoteHost + ":" + config.conSys.remotePort; + if (config.conSys.resourcePath != null) + { + if (config.conSys.resourcePath.charAt(0) != '/') + apiEndpointUrl += '/'; + apiEndpointUrl += config.conSys.resourcePath; + } + } + + @Override + protected void doInit() throws SensorHubException + { + this.dataBaseView = config.dataSourceSelector.getFilteredView(getParentHub()); + + this.client = ConSysApiClient. + newBuilder(apiEndpointUrl) + .simpleAuth(config.conSys.user, !config.conSys.password.isEmpty() ? config.conSys.password.toCharArray() : null) + .build(); + + // TODO: Other initialization + } + + @Override + protected void doStart() throws SensorHubException { + // Check if endpoint is available + try{ + HttpURLConnection urlConnection = (HttpURLConnection) client.endpoint.toURL().openConnection(); + if (!Strings.isNullOrEmpty(config.conSys.user)) { + urlConnection.setAuthenticator(new Authenticator() { + @Override + public PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(config.conSys.user, config.conSys.password != null ? config.conSys.password.toCharArray() : new char[0]); + } + }); + } + urlConnection.connect(); + Asserts.checkArgument(urlConnection.getResponseCode() == HttpURLConnection.HTTP_OK); + } catch (Exception e) { + throw new SensorHubException("Unable to establish connection to Connected Systems endpoint"); + } + + reportStatus("Connection to " + apiEndpointUrl + " was made successfully"); + + dataBaseView.getSystemDescStore().selectEntries( + new SystemFilter.Builder() + .withNoParent() + .build()) + .forEach((entry) -> { + var systemRegInfo = registerSystem(entry.getKey().getInternalID(), entry.getValue()); + checkSubSystems(systemRegInfo); + registerSystemDataStreams(systemRegInfo); + }); + + subscribeToRegistryEvents(); + + for (var stream : dataStreams.values()) + startStream(stream); + } + + @Override + protected void doStop() throws SensorHubException { + super.doStop(); + + for(var stream : dataStreams.values()) + stopStream(stream); + } + + protected void subscribeToRegistryEvents() + { + getParentHub().getEventBus().newSubscription(SystemEvent.class) + .withTopicID(EventUtils.getSystemRegistryTopicID()) + .withEventType(SystemEvent.class) + .subscribe(this::handleEvent) + .thenAccept(sub -> { + registrySubscription = sub; + sub.request(Long.MAX_VALUE); + }); + } + + protected void checkSubSystems(SystemRegInfo parentSystemRegInfo) + { + dataBaseView.getSystemDescStore().selectEntries( + new SystemFilter.Builder() + .withParents(parentSystemRegInfo.internalID) + .build()) + .forEach((entry) -> { + var systemRegInfo = registerSubSystem(entry.getKey().getInternalID(), parentSystemRegInfo, entry.getValue()); + registerSystemDataStreams(systemRegInfo); + }); + } + + private String tryUpdateSystem(ISystemWithDesc system) + { + try { + var uidRequest = client.getSystemByUid(system.getUniqueIdentifier(), ResourceFormat.JSON); + String systemID; + if(uidRequest != null) { + var oldSys = uidRequest.get(); + systemID = oldSys.getId(); + var responseCode = client.updateSystem(systemID, system).get(); + boolean successful = responseCode == 204; + if(!successful) + throw new ClientException("Failed to update resource: " + apiEndpointUrl + ConSysApiClient.SYSTEMS_COLLECTION + "/" + systemID); + return systemID; + } + } catch (ExecutionException | InterruptedException | ClientException e) { + throw new RuntimeException(e); + } + return null; + } + + private void disableSystem(String uid, boolean remove) + { + var sysInfo = remove ? registeredSystems.remove(uid) : registeredSystems.get(uid); + if(sysInfo != null) + { + if(sysInfo.subscription != null) + { + sysInfo.subscription.cancel(); + sysInfo.subscription = null; + getLogger().debug("Unsubscribed from system {}", uid); + } + + var sysDataStreams = dataStreams.subMap(uid, uid + "\uffff"); + for(var streamInfo : sysDataStreams.values()) + stopStream(streamInfo); + + if(remove) + getLogger().info("Removed system {}", uid); + } + } + + protected SystemRegInfo registerSystem(BigId systemInternalID, ISystemWithDesc system) + { + try { + String systemID = tryUpdateSystem(system); + if(systemID == null) + systemID = client.addSystem(system).get(); + + return registerSystemInfo(systemID, systemInternalID, system); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + protected SystemRegInfo registerSubSystem(BigId systemInternalID, SystemRegInfo parentSystem, ISystemWithDesc system) + { + try { + var getParent = client.getSystemById(parentSystem.systemID, ResourceFormat.JSON); + var parent = getParent.get(); + if(parent == null) + throw new ClientException("Could not retrieve parent system " + parentSystem.systemID); + + String systemID = tryUpdateSystem(system); + if(systemID == null) + systemID = client.addSubSystem(parentSystem.systemID, system).get(); + + return registerSystemInfo(systemID, systemInternalID, system); + } catch (InterruptedException | ExecutionException | ClientException e) { + throw new RuntimeException(e); + } + } + + private SystemRegInfo registerSystemInfo(String systemID, BigId systemInternalID, ISystemWithDesc system) + { + SystemRegInfo systemRegInfo = new SystemRegInfo(); + systemRegInfo.systemID = systemID; + systemRegInfo.internalID = systemInternalID; + systemRegInfo.system = system; + + getParentHub().getEventBus().newSubscription(SystemEvent.class) + .withTopicID(EventUtils.getSystemStatusTopicID(system.getUniqueIdentifier())) + .withEventType(SystemEvent.class) + .subscribe(this::handleEvent) + .thenAccept(sub -> { + systemRegInfo.subscription = sub; + sub.request(Long.MAX_VALUE); + }); + + registeredSystems.put(system.getUniqueIdentifier(), systemRegInfo); + return systemRegInfo; + } + + protected List registerSystemDataStreams(SystemRegInfo system) + { + List addedStreams = new ArrayList<>(); + + dataBaseView.getDataStreamStore().selectEntries( + new DataStreamFilter.Builder() + .withSystems(new SystemFilter.Builder() + .withUniqueIDs(system.system.getUniqueIdentifier()) + .build()) + .build()) + .forEach((entry) -> { + if(Objects.equals(entry.getValue().getSystemID().getUniqueID(), system.system.getUniqueIdentifier())) + addedStreams.add(registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue())); + }); + return addedStreams; + } + + protected StreamInfo registerDataStream(BigId dsId, String systemID, IDataStreamInfo dataStream) + { + var dsTopicId = EventUtils.getDataStreamDataTopicID(dataStream); + + StreamInfo streamInfo = new StreamInfo(); + try { + streamInfo.dataStreamID = client.addDataStream(systemID, dataStream).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + + streamInfo.dataStream = dataStream; + streamInfo.topicID = dsTopicId; + streamInfo.sysUID = dataStream.getSystemID().getUniqueID(); + streamInfo.internalID = dsId; + + dataStreams.put(dsTopicId, streamInfo); + return streamInfo; + } + + protected void addAndStartStream(String uid, String outputName) + { + try + { + var sysInfo = registeredSystems.get(uid); + var dsEntry = dataBaseView.getDataStreamStore().getLatestVersionEntry(uid, outputName); + if(sysInfo != null && dsEntry != null) + { + var dsId = dsEntry.getKey().getInternalID(); + var dsInfo = dsEntry.getValue(); + + var streamInfo = registerDataStream(dsId, sysInfo.systemID, dsInfo); + startStream(streamInfo); + } + } + catch (ClientException e) + { + reportError(e.getMessage(), e.getCause()); + } + } + + protected void disableDataStream(String uid, String outputName, boolean remove) + { + var dsSourceId = EventUtils.getDataStreamDataTopicID(uid, outputName); + var streamInfo = remove ? dataStreams.remove(dsSourceId) : dataStreams.get(dsSourceId); + if(streamInfo != null) + { + stopStream(streamInfo); + if(remove) + getLogger().info("Removed datastream {}", dsSourceId); + } + } + + protected synchronized void startStream(StreamInfo streamInfo) throws ClientException + { + try + { + if(streamInfo.subscription != null) + return; + + getParentHub().getEventBus().newSubscription(ObsEvent.class) + .withTopicID(streamInfo.topicID) + .withEventType(ObsEvent.class) + .subscribe(e -> handleEvent(e, streamInfo)) + .thenAccept(subscription -> { + streamInfo.subscription = subscription; + subscription.request(Long.MAX_VALUE); + + // Push latest observation + this.dataBaseView.getObservationStore().select(new ObsFilter.Builder() + .withDataStreams(streamInfo.internalID) + .withLatestResult() + .build()) + .forEach(obs -> + client.pushObs(streamInfo.dataStreamID, streamInfo.dataStream, obs, this.dataBaseView.getObservationStore())); + + getLogger().info("Starting Connected Systems data push for stream {} with UID {} to Connected Systems endpoint {}", + streamInfo.dataStreamID, streamInfo.sysUID, apiEndpointUrl); + }); + } catch (Exception e) + { + throw new ClientException("Error starting data push for stream " + streamInfo.topicID, e); + } + } + + protected void stopStream(StreamInfo streamInfo) + { + if(streamInfo.subscription != null) + { + streamInfo.subscription.cancel(); + streamInfo.subscription = null; + } + + // TODO Check other stuff + } + + @Override + public boolean isConnected() + { + return false; + } + + protected void handleEvent(final ObsEvent e, StreamInfo streamInfo) + { + var length = e.getObservations().length; + for(var obs : e.getObservations()) + client.pushObs(streamInfo.dataStreamID, streamInfo.dataStream, obs, this.dataBaseView.getObservationStore()); + } + + protected void handleEvent(final SystemEvent e) + { + // sensor description updated + if (e instanceof SystemChangedEvent) + { + CompletableFuture.runAsync(() -> { + var system = dataBaseView.getSystemDescStore().getCurrentVersion(e.getSystemUID()); + if(system != null) + tryUpdateSystem(system); + }); + } + + // system events + else if (e instanceof SystemAddedEvent || e instanceof SystemEnabledEvent) + { + CompletableFuture.runAsync(() -> { + var system = dataBaseView.getSystemDescStore().getCurrentVersionEntry(e.getSystemUID()); + if(system != null) + { + var systemRegInfo = registerSystem(system.getKey().getInternalID(), system.getValue()); + checkSubSystems(systemRegInfo); + var newStreams = registerSystemDataStreams(systemRegInfo); + for(var streamInfo : newStreams) { + try { + startStream(streamInfo); + } catch (ClientException ex) { + throw new RuntimeException(ex); + } + } + } + }); + } + + else if (e instanceof SystemDisabledEvent) + { + CompletableFuture.runAsync(() -> { + var sysUID = e.getSystemUID(); + disableSystem(sysUID, false); + }); + } + + else if (e instanceof SystemRemovedEvent) + { + CompletableFuture.runAsync(() -> { + var sysUID = e.getSystemUID(); + disableSystem(sysUID, true); + }); + } + + // datastream events + else if (e instanceof DataStreamAddedEvent || e instanceof DataStreamEnabledEvent) + { + CompletableFuture.runAsync(() -> { + var sysUID = e.getSystemUID(); + var outputName = ((DataStreamEvent) e).getOutputName(); + addAndStartStream(sysUID, outputName); + }); + } + + else if (e instanceof DataStreamDisabledEvent) + { + CompletableFuture.runAsync(() -> { + var sysUID = e.getSystemUID(); + var outputName = ((DataStreamEvent) e).getOutputName(); + disableDataStream(sysUID, outputName, false); + }); + } + + else if (e instanceof DataStreamRemovedEvent) + { + CompletableFuture.runAsync(() -> { + var sysUID = e.getSystemUID(); + var outputName = ((DataStreamEvent) e).getOutputName(); + disableDataStream(sysUID, outputName, true); + }); + } + } + +} diff --git a/sensorhub-service-consys/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider b/sensorhub-service-consys/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider index 18a88ed7f..8e42cbcc6 100644 --- a/sensorhub-service-consys/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider +++ b/sensorhub-service-consys/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider @@ -1 +1,2 @@ -org.sensorhub.impl.service.consys.ConSysApiServiceDescriptor \ No newline at end of file +org.sensorhub.impl.service.consys.ConSysApiServiceDescriptor +org.sensorhub.impl.service.consys.client.ConSysApiClientDescriptor \ No newline at end of file