diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java index 3f867ab7..46c8a66a 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java @@ -62,7 +62,7 @@ public interface MultiTenantConnectionManager { boolean closeEndpoint(String tenantId, MqttEndpoint mqttEndpoint); /** - * Closes all connections, MQTT connections as well as AMQP for all tenants. + * Closes all connections, MQTT connections as well as AMQP connections for all tenants. */ void closeAllTenants(); diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java index 71bffca8..9e62baf9 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java @@ -17,6 +17,7 @@ import java.util.List; import org.eclipse.hono.client.HonoConnection; +import org.eclipse.hono.client.ServerErrorException; import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; import org.eclipse.hono.config.ClientConfigProperties; import org.slf4j.Logger; @@ -29,6 +30,10 @@ /** * Manages all connections of one tenant, MQTT connections of devices as well as the AMQP connection to Hono's AMQP * adapter. + * + * By invoking {@link #connect()} an AMQP client for the tenant is connected. Each MQTT endpoint needs to be added to + * keep track of all MQTT connections belonging to the tenant. When the last MQTT endpoint for the tenant is closed, the + * AMQP client - and thus this instance - is closed automatically. *

* Note: do not re-use an instance if it is already closed. */ @@ -42,24 +47,55 @@ class TenantConnections { private boolean closed = false; + /** + * Creates a new instance with a new {@link AmqpAdapterClientFactory} and a new {@link HonoConnection}. + * + * @param tenantId The ID of the tenant whose connections are to be managed + * @param vertx The Vert.x instance to be used by the HonoConnection. + * @param clientConfig The client configuration to be used by the HonoConnection. + */ TenantConnections(final String tenantId, final Vertx vertx, final ClientConfigProperties clientConfig) { this(AmqpAdapterClientFactory.create(HonoConnection.newConnection(vertx, clientConfig), tenantId)); } + /** + * Creates a new instance for the given {@link AmqpAdapterClientFactory}. + * + * @param amqpAdapterClientFactory The AmqpAdapterClientFactory to use for creating AMQP clients. + */ TenantConnections(final AmqpAdapterClientFactory amqpAdapterClientFactory) { this.amqpAdapterClientFactory = amqpAdapterClientFactory; } + /** + * Opens a connection to Hono's AMQP protocol adapter for the tenant to be managed. + * + * @return This instance for fluent use. + * @throws IllegalStateException if this instance is already closed. + */ public TenantConnections connect() { getAmqpAdapterClientFactory().connect().onSuccess(con -> log.debug("Connected to AMQP adapter")); return this; } + /** + * Adds an MQTT endpoint for the tenant. + * + * @param mqttEndpoint The endpoint to add. + */ public void addEndpoint(final MqttEndpoint mqttEndpoint) { checkNotClosed(); mqttEndpoints.add(mqttEndpoint); } + /** + * Closes the given MQTT endpoint and if there are no other MQTT endpoints present, it closes the AMQP client and + * this instance. + * + * @param mqttEndpoint The endpoint to be closed. + * @return {@code true} if the AMQP connection has been closed. + * @throws IllegalStateException if this instance is already closed. + */ public boolean closeEndpoint(final MqttEndpoint mqttEndpoint) { closeEndpointIfConnected(mqttEndpoint); @@ -75,6 +111,8 @@ public boolean closeEndpoint(final MqttEndpoint mqttEndpoint) { /** * Closes all MQTT endpoints and the AMQP connection. + * + * @throws IllegalStateException if this instance is already closed. */ public void closeAllConnections() { log.info("closing all AMQP connections"); @@ -98,10 +136,24 @@ private void closeThisInstance() { closed = true; } + /** + * Checks whether the AMQP connection is currently established. + * + * @param connectTimeout The maximum number of milliseconds to wait for an ongoing connection attempt to finish. + * @return A succeeded future if this connection is established. Otherwise, the future will be failed with a + * {@link ServerErrorException}. + * @throws IllegalStateException if this instance is already closed. + */ public Future isConnected(final long connectTimeout) { return getAmqpAdapterClientFactory().isConnected(connectTimeout); } + /** + * Returns the AmqpAdapterClientFactory for the tenant. + * + * @return The AmqpAdapterClientFactory. + * @throws IllegalStateException if this instance is already closed. + */ public AmqpAdapterClientFactory getAmqpAdapterClientFactory() throws IllegalStateException { checkNotClosed(); return amqpAdapterClientFactory;