diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java index 5ded80b0..d395a207 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java @@ -454,9 +454,9 @@ private Future connectGatewayToAmqpAdapter(final String tenantId, final ClientConfigProperties clientConfig, final MqttEndpoint endpoint) { return tenantConnectionManager.connect(tenantId, vertx, clientConfig) - .onSuccess(v -> tenantConnectionManager.addEndpoint(tenantId, endpoint)) .onFailure(e -> log.info("Failed to connect to Hono [tenant-id: {}, username: {}]", tenantId, - clientConfig.getUsername())); + clientConfig.getUsername())) + .compose(v -> tenantConnectionManager.addEndpoint(tenantId, endpoint)); } @@ -484,11 +484,12 @@ private void cleanupConnections(final MqttEndpoint endpoint, cmdSubscriptionsManager.removeAllSubscriptions(); final String tenantId = authenticatedDevice.getTenantId(); - final boolean amqpLinkClosed = tenantConnectionManager.closeEndpoint(tenantId, endpoint); - - if (amqpLinkClosed) { - log.info("closing AMQP connection for tenant [{}]", tenantId); - } + tenantConnectionManager.closeEndpoint(tenantId, endpoint) + .onSuccess(amqpLinkClosed -> { + if (amqpLinkClosed) { + log.info("closing AMQP connection for tenant [{}]", tenantId); + } + }); } /** diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java index 46c8a66a..63e78115 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java @@ -29,6 +29,9 @@ /** * Manages connections for multiple tenants. + *

+ * NB The {@link #connect(String, Vertx, ClientConfigProperties)} method needs to be invoked before calling any + * of the other methods. */ public interface MultiTenantConnectionManager { @@ -48,8 +51,10 @@ public interface MultiTenantConnectionManager { * * @param tenantId The tenant to which the endpoint belongs. * @param mqttEndpoint The endpoint to be added. + * @return A future indicating the outcome of the operation. The future will succeed if the endpoint has been added + * successfully. Otherwise the future will fail with a failure message indicating the cause of the failure. */ - void addEndpoint(String tenantId, MqttEndpoint mqttEndpoint); + Future addEndpoint(String tenantId, MqttEndpoint mqttEndpoint); /** * Closes the given MQTT endpoint and if there are no other open endpoints for this tenant, it closes the @@ -57,9 +62,11 @@ public interface MultiTenantConnectionManager { * * @param tenantId The tenant to which the endpoint belongs. * @param mqttEndpoint The endpoint to be closed. - * @return {@code true} if the AMQP connection and all endpoints have been closed. + * @return A future indicating the outcome of the operation. The future will succeed with a boolean that is + * {@code true} if the AMQP connection (and all MQTT endpoints) have been closed. If an error occurs, the + * future will fail with a failure message indicating the cause of the failure. */ - boolean closeEndpoint(String tenantId, MqttEndpoint mqttEndpoint); + Future closeEndpoint(String tenantId, MqttEndpoint mqttEndpoint); /** * Closes all connections, MQTT connections as well as AMQP connections for all tenants. diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java index 3ec95a2a..e29652cc 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java @@ -45,7 +45,7 @@ public Future connect(final String tenantId, final Vertx vertx, final Clie connectionsPerTenant.computeIfAbsent(tenantId, k -> new TenantConnections(k, vertx, clientConfig).connect()); return getTenantConnections(tenantId) - .isConnected(clientConfig.getConnectTimeout()) + .compose(tenantConnections -> tenantConnections.isConnected(clientConfig.getConnectTimeout())) .onFailure(ex -> { final TenantConnections failedTenant = connectionsPerTenant.remove(tenantId); if (failedTenant != null) { @@ -55,19 +55,25 @@ public Future connect(final String tenantId, final Vertx vertx, final Clie } @Override - public void addEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) { - getTenantConnections(tenantId).addEndpoint(mqttEndpoint); + public Future addEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) { + return getTenantConnections(tenantId) + .map(tenantConnections -> { + tenantConnections.addEndpoint(mqttEndpoint); + return null; + }); } @Override - public boolean closeEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) { + public Future closeEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) { - final boolean amqpLinkClosed = getTenantConnections(tenantId).closeEndpoint(mqttEndpoint); - if (amqpLinkClosed) { - connectionsPerTenant.remove(tenantId); - } + return getTenantConnections(tenantId) + .map(tenantConnections -> tenantConnections.closeEndpoint(mqttEndpoint)) + .onSuccess(amqpLinkClosed -> { + if (amqpLinkClosed) { + connectionsPerTenant.remove(tenantId); + } + }); - return amqpLinkClosed; } @Override @@ -78,64 +84,49 @@ public void closeAllTenants() { @Override public Future getOrCreateTelemetrySender(final String tenantId) { - try { - return getAmqpAdapterClientFactory(tenantId).getOrCreateTelemetrySender(); - } catch (Exception ex) { - return Future.failedFuture(ex); - } + return getAmqpAdapterClientFactory(tenantId).compose(AmqpAdapterClientFactory::getOrCreateTelemetrySender); } @Override public Future getOrCreateEventSender(final String tenantId) { - try { - return getAmqpAdapterClientFactory(tenantId).getOrCreateEventSender(); - } catch (Exception ex) { - return Future.failedFuture(ex); - } + return getAmqpAdapterClientFactory(tenantId).compose(AmqpAdapterClientFactory::getOrCreateEventSender); } @Override public Future getOrCreateCommandResponseSender(final String tenantId) { - try { - return getAmqpAdapterClientFactory(tenantId).getOrCreateCommandResponseSender(); - } catch (Exception ex) { - return Future.failedFuture(ex); - } + return getAmqpAdapterClientFactory(tenantId) + .compose(AmqpAdapterClientFactory::getOrCreateCommandResponseSender); } @Override public Future createDeviceSpecificCommandConsumer(final String tenantId, final String deviceId, final Consumer messageHandler) { - try { - return getAmqpAdapterClientFactory(tenantId).createDeviceSpecificCommandConsumer(deviceId, messageHandler); - } catch (Exception ex) { - return Future.failedFuture(ex); - } + return getAmqpAdapterClientFactory(tenantId) + .compose(factory -> factory.createDeviceSpecificCommandConsumer(deviceId, messageHandler)); + } @Override public Future createCommandConsumer(final String tenantId, final Consumer messageHandler) { - try { - return getAmqpAdapterClientFactory(tenantId).createCommandConsumer(messageHandler); - } catch (Exception ex) { - return Future.failedFuture(ex); - } + return getAmqpAdapterClientFactory(tenantId).compose(factory -> factory.createCommandConsumer(messageHandler)); + } - private TenantConnections getTenantConnections(final String tenantId) throws IllegalArgumentException { + private Future getTenantConnections(final String tenantId) throws IllegalArgumentException { final TenantConnections tenantConnections = connectionsPerTenant.get(tenantId); if (tenantConnections == null) { - throw new IllegalArgumentException("tenant [" + tenantId + "] is not connected"); + return Future.failedFuture("tenant [" + tenantId + "] is not connected"); } else { - return tenantConnections; + return Future.succeededFuture(tenantConnections); } } - private AmqpAdapterClientFactory getAmqpAdapterClientFactory(final String tenantId) + private Future getAmqpAdapterClientFactory(final String tenantId) throws IllegalStateException, IllegalArgumentException { - return getTenantConnections(tenantId).getAmqpAdapterClientFactory(); + return getTenantConnections(tenantId) + .compose(tenantConnections -> Future.succeededFuture(tenantConnections.getAmqpAdapterClientFactory())); } } diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java index 9e62baf9..e86b1146 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java @@ -111,8 +111,6 @@ public boolean closeEndpoint(final MqttEndpoint mqttEndpoint) { /** * Closes all MQTT endpoints and the AMQP connection. - * - * @throws IllegalStateException if this instance is already closed. */ public void closeAllConnections() { log.info("closing all AMQP connections"); @@ -132,7 +130,7 @@ private void closeEndpointIfConnected(final MqttEndpoint mqttEndpoint) { } private void closeThisInstance() { - getAmqpAdapterClientFactory().disconnect(); + amqpAdapterClientFactory.disconnect(); closed = true; } diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java index 5d0d9281..c0ce6eab 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java @@ -109,7 +109,8 @@ public void setUp() { tenantConnectionManager = mock(MultiTenantConnectionManager.class); when(tenantConnectionManager.connect(anyString(), any(), any())).thenReturn(Future.succeededFuture()); - when(tenantConnectionManager.closeEndpoint(anyString(), any())).thenReturn(true); + when(tenantConnectionManager.addEndpoint(anyString(), any())).thenReturn(Future.succeededFuture()); + when(tenantConnectionManager.closeEndpoint(anyString(), any())).thenReturn(Future.succeededFuture(true)); amqpClientConfig = new ClientConfigProperties(); final HonoConnection connection = mockHonoConnection(vertx, amqpClientConfig, protonSender); diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImplTest.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImplTest.java index d5ae1030..dc0d2d91 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImplTest.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImplTest.java @@ -18,7 +18,6 @@ import static org.mockito.Mockito.when; import org.eclipse.hono.config.ClientConfigProperties; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,7 +56,7 @@ public void amqpConnectionIsClosedWhenClosingLastEndpoint() { connectionManager.connect(TENANT_ID, vertx, new ClientConfigProperties()); connectionManager.addEndpoint(TENANT_ID, endpoint); - assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint)).isTrue(); + assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint).result()).isTrue(); } @@ -71,7 +70,7 @@ public void amqpConnectionIsOpenWhenClosingEndpointThatIsNotTheLastOne() { connectionManager.addEndpoint(TENANT_ID, endpoint); connectionManager.addEndpoint(TENANT_ID, mock(MqttEndpoint.class)); - assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint)).isFalse(); + assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint).result()).isFalse(); } @@ -85,26 +84,23 @@ public void addEndpointFailsIfInstanceIsClosed() { connectionManager.closeAllTenants(); - Assertions.assertThrows(IllegalArgumentException.class, - () -> connectionManager.addEndpoint(TENANT_ID, endpoint)); + assertThat(connectionManager.addEndpoint(TENANT_ID, endpoint).failed()).isTrue(); } /** - * Verifies that trying to add an endpoint without connecting the tenant first, throws an exception. + * Verifies that trying to add an endpoint without connecting the tenant first fails. */ @Test public void addEndpointFailsIfNotConnected() { - Assertions.assertThrows(IllegalArgumentException.class, - () -> connectionManager.addEndpoint(TENANT_ID, endpoint)); + assertThat(connectionManager.addEndpoint(TENANT_ID, endpoint).failed()).isTrue(); } /** - * Verifies that trying to close an endpoint without connecting the tenant first, throws an exception. + * Verifies that trying to close an endpoint without connecting the tenant first fails. */ @Test public void closeEndpointFailsIfNotConnected() { - Assertions.assertThrows(IllegalArgumentException.class, - () -> connectionManager.closeEndpoint(TENANT_ID, endpoint)); + assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint).failed()).isTrue(); } /** @@ -114,20 +110,17 @@ public void closeEndpointFailsIfNotConnected() { @Test public void futureFailsIfNotConnected() { - assertThat(connectionManager.getOrCreateTelemetrySender(TENANT_ID).cause()) - .isInstanceOf(IllegalArgumentException.class); + assertThat(connectionManager.getOrCreateTelemetrySender(TENANT_ID).failed()).isTrue(); - assertThat(connectionManager.getOrCreateEventSender(TENANT_ID).cause()) - .isInstanceOf(IllegalArgumentException.class); + assertThat(connectionManager.getOrCreateEventSender(TENANT_ID).failed()).isTrue(); - assertThat(connectionManager.getOrCreateCommandResponseSender(TENANT_ID).cause()) - .isInstanceOf(IllegalArgumentException.class); + assertThat(connectionManager.getOrCreateCommandResponseSender(TENANT_ID).failed()).isTrue(); assertThat(connectionManager.createDeviceSpecificCommandConsumer(TENANT_ID, "device-id", msg -> { - }).cause()).isInstanceOf(IllegalArgumentException.class); + }).failed()).isTrue(); assertThat(connectionManager.createCommandConsumer(TENANT_ID, msg -> { - }).cause()).isInstanceOf(IllegalArgumentException.class); + }).failed()).isTrue(); }