diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java index d395a207..5d9898e9 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java @@ -487,7 +487,7 @@ private void cleanupConnections(final MqttEndpoint endpoint, tenantConnectionManager.closeEndpoint(tenantId, endpoint) .onSuccess(amqpLinkClosed -> { if (amqpLinkClosed) { - log.info("closing AMQP connection for tenant [{}]", tenantId); + log.info("closed AMQP connection for tenant [{}]", tenantId); } }); } diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java index 63e78115..34952d69 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java @@ -36,7 +36,7 @@ public interface MultiTenantConnectionManager { /** - * Connect to Hono's AMQP adapter with the given configuration. + * Connects to Hono's AMQP adapter with the given configuration. * * @param tenantId The tenant to connect. * @param vertx The Vert.x instance to use for the connection. diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java index e29652cc..7bba53f1 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java @@ -42,7 +42,11 @@ public class MultiTenantConnectionManagerImpl implements MultiTenantConnectionMa @Override public Future connect(final String tenantId, final Vertx vertx, final ClientConfigProperties clientConfig) { - connectionsPerTenant.computeIfAbsent(tenantId, k -> new TenantConnections(k, vertx, clientConfig).connect()); + connectionsPerTenant.computeIfAbsent(tenantId, k -> { + final TenantConnections tenantConnections = new TenantConnections(k, vertx, clientConfig); + tenantConnections.connect(); + return tenantConnections; + }); return getTenantConnections(tenantId) .compose(tenantConnections -> tenantConnections.isConnected(clientConfig.getConnectTimeout())) @@ -57,10 +61,7 @@ public Future connect(final String tenantId, final Vertx vertx, final Clie @Override public Future addEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) { return getTenantConnections(tenantId) - .map(tenantConnections -> { - tenantConnections.addEndpoint(mqttEndpoint); - return null; - }); + .compose(tenantConnections -> tenantConnections.addEndpoint(mqttEndpoint)); } @Override @@ -115,7 +116,7 @@ public Future createCommandConsumer(final String tenantId, } - private Future getTenantConnections(final String tenantId) throws IllegalArgumentException { + private Future getTenantConnections(final String tenantId) { final TenantConnections tenantConnections = connectionsPerTenant.get(tenantId); if (tenantConnections == null) { return Future.failedFuture("tenant [" + tenantId + "] is not connected"); @@ -124,9 +125,7 @@ private Future getTenantConnections(final String tenantId) th } } - private Future getAmqpAdapterClientFactory(final String tenantId) - throws IllegalStateException, IllegalArgumentException { - return getTenantConnections(tenantId) - .compose(tenantConnections -> Future.succeededFuture(tenantConnections.getAmqpAdapterClientFactory())); + private Future getAmqpAdapterClientFactory(final String tenantId) { + return getTenantConnections(tenantId).compose(TenantConnections::getAmqpAdapterClientFactory); } } diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java index e86b1146..c6697e66 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java @@ -16,6 +16,7 @@ import java.util.ArrayList; import java.util.List; +import org.eclipse.hono.client.ConnectionLifecycle; import org.eclipse.hono.client.HonoConnection; import org.eclipse.hono.client.ServerErrorException; import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; @@ -44,6 +45,7 @@ class TenantConnections { private final AmqpAdapterClientFactory amqpAdapterClientFactory; private final Logger log = LoggerFactory.getLogger(getClass()); + private final String tenantId; private boolean closed = false; @@ -55,37 +57,40 @@ class TenantConnections { * @param clientConfig The client configuration to be used by the HonoConnection. */ TenantConnections(final String tenantId, final Vertx vertx, final ClientConfigProperties clientConfig) { - this(AmqpAdapterClientFactory.create(HonoConnection.newConnection(vertx, clientConfig), tenantId)); + this(AmqpAdapterClientFactory.create(HonoConnection.newConnection(vertx, clientConfig), tenantId), tenantId); } /** * Creates a new instance for the given {@link AmqpAdapterClientFactory}. + *

+ * This constructor is for testing purposes only. * * @param amqpAdapterClientFactory The AmqpAdapterClientFactory to use for creating AMQP clients. + * @param tenantId The ID of the tenant whose connections are to be managed */ - TenantConnections(final AmqpAdapterClientFactory amqpAdapterClientFactory) { + TenantConnections(final AmqpAdapterClientFactory amqpAdapterClientFactory, final String tenantId) { this.amqpAdapterClientFactory = amqpAdapterClientFactory; + this.tenantId = tenantId; } /** * Opens a connection to Hono's AMQP protocol adapter for the tenant to be managed. * - * @return This instance for fluent use. - * @throws IllegalStateException if this instance is already closed. + * @return A future indicating the outcome of the operation. */ - public TenantConnections connect() { - getAmqpAdapterClientFactory().connect().onSuccess(con -> log.debug("Connected to AMQP adapter")); - return this; + public Future connect() { + return getAmqpAdapterClientFactory().compose(ConnectionLifecycle::connect) + .onSuccess(con -> log.debug("Connected to AMQP adapter")); } /** * Adds an MQTT endpoint for the tenant. * * @param mqttEndpoint The endpoint to add. + * @return A future indicating the outcome of the operation. */ - public void addEndpoint(final MqttEndpoint mqttEndpoint) { - checkNotClosed(); - mqttEndpoints.add(mqttEndpoint); + public Future addEndpoint(final MqttEndpoint mqttEndpoint) { + return failIfClosed().onSuccess(v -> mqttEndpoints.add(mqttEndpoint)); } /** @@ -94,7 +99,6 @@ public void addEndpoint(final MqttEndpoint mqttEndpoint) { * * @param mqttEndpoint The endpoint to be closed. * @return {@code true} if the AMQP connection has been closed. - * @throws IllegalStateException if this instance is already closed. */ public boolean closeEndpoint(final MqttEndpoint mqttEndpoint) { @@ -139,27 +143,29 @@ private void closeThisInstance() { * * @param connectTimeout The maximum number of milliseconds to wait for an ongoing connection attempt to finish. * @return A succeeded future if this connection is established. Otherwise, the future will be failed with a - * {@link ServerErrorException}. - * @throws IllegalStateException if this instance is already closed. + * {@link ServerErrorException}, or an {@link IllegalStateException} if this instance is already closed. */ public Future isConnected(final long connectTimeout) { - return getAmqpAdapterClientFactory().isConnected(connectTimeout); + return getAmqpAdapterClientFactory().compose(f -> f.isConnected(connectTimeout)); } /** * Returns the AmqpAdapterClientFactory for the tenant. * - * @return The AmqpAdapterClientFactory. - * @throws IllegalStateException if this instance is already closed. + * @return A future containing the AmqpAdapterClientFactory, or, if this instance is already closed, a failed + * future. */ - public AmqpAdapterClientFactory getAmqpAdapterClientFactory() throws IllegalStateException { - checkNotClosed(); - return amqpAdapterClientFactory; + public Future getAmqpAdapterClientFactory() { + return failIfClosed().map(amqpAdapterClientFactory); } - private void checkNotClosed() throws IllegalStateException { + private Future failIfClosed() { if (closed) { - throw new IllegalStateException("all connections for this tenant are already closed"); + final Exception ex = new IllegalStateException("connections for this tenant are already closed"); + log.warn("This should not happen", ex); + return Future.failedFuture(ex); + } else { + return Future.succeededFuture(); } } } diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnectionsTest.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnectionsTest.java index f0e976a3..43aeb36f 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnectionsTest.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnectionsTest.java @@ -14,14 +14,13 @@ package org.eclipse.hono.gateway.sdk.mqtt2amqp; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.eclipse.hono.client.HonoConnection; import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,20 +43,10 @@ public class TenantConnectionsTest { public void setUp() { amqpAdapterClientFactory = mock(AmqpAdapterClientFactory.class); - tenantConnections = new TenantConnections(amqpAdapterClientFactory); + tenantConnections = new TenantConnections(amqpAdapterClientFactory, "a-tenant-id"); endpoint = mock(MqttEndpoint.class); } - /** - * Verifies that the connect method returns the instance. - */ - @Test - public void connectReturnsTheInstance() { - when(amqpAdapterClientFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class))); - - assertThat(tenantConnections.connect()).isEqualTo(tenantConnections); - } - /** * Verifies that adding an endpoint works. */ @@ -90,7 +79,7 @@ public void instanceIsClosedWhenClosingLastEndpoint() { tenantConnections.closeEndpoint(endpoint); - Assertions.assertThrows(IllegalStateException.class, () -> tenantConnections.getAmqpAdapterClientFactory()); + assertThat(tenantConnections.getAmqpAdapterClientFactory().failed()).isTrue(); } /** @@ -103,7 +92,7 @@ public void instanceIsOpenWhenClosingEndpointThatIsNotTheLastOne() { tenantConnections.closeEndpoint(endpoint); - assertThat(tenantConnections.getAmqpAdapterClientFactory()).isNotNull(); + assertThat(tenantConnections.getAmqpAdapterClientFactory().succeeded()).isTrue(); } /** @@ -116,7 +105,7 @@ public void instanceIsClosedWhenInvokingClose() { tenantConnections.closeAllConnections(); - Assertions.assertThrows(IllegalStateException.class, () -> tenantConnections.getAmqpAdapterClientFactory()); + assertThat(tenantConnections.getAmqpAdapterClientFactory().failed()).isTrue(); } /** @@ -124,8 +113,22 @@ public void instanceIsClosedWhenInvokingClose() { */ @Test public void isConnectedDelegatesToClientFactory() { + when(amqpAdapterClientFactory.isConnected(anyLong())).thenReturn(Future.succeededFuture()); + tenantConnections.isConnected(5L); verify(amqpAdapterClientFactory).isConnected(eq(5L)); } + /** + * Verifies that the connect() method delegates the call to the client factory. + */ + @Test + public void connectDelegatesToClientFactory() { + when(amqpAdapterClientFactory.connect()).thenReturn(Future.succeededFuture()); + + tenantConnections.connect(); + verify(amqpAdapterClientFactory).connect(); + + } + }