Skip to content

Commit

Permalink
[eclipse-hono#5] Refactored MultiTenantConnectionManager.
Browse files Browse the repository at this point in the history
MultiTenantConnectionManager.addEndpoint() and MultiTenantConnectionManager.closeEndpoint()
now return a future instead of throwing exceptions.

Closing TenantConnections no longer throws an exception if the instance is already closed.

Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
  • Loading branch information
b-abel committed Aug 24, 2020
1 parent 48740b1 commit ab79d77
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,9 @@ private Future<Void> connectGatewayToAmqpAdapter(final String tenantId,
final ClientConfigProperties clientConfig,
final MqttEndpoint endpoint) {
return tenantConnectionManager.connect(tenantId, vertx, clientConfig)
.onSuccess(v -> tenantConnectionManager.addEndpoint(tenantId, endpoint))
.onFailure(e -> log.info("Failed to connect to Hono [tenant-id: {}, username: {}]", tenantId,
clientConfig.getUsername()));
clientConfig.getUsername()))
.compose(v -> tenantConnectionManager.addEndpoint(tenantId, endpoint));

}

Expand Down Expand Up @@ -484,11 +484,12 @@ private void cleanupConnections(final MqttEndpoint endpoint,
cmdSubscriptionsManager.removeAllSubscriptions();

final String tenantId = authenticatedDevice.getTenantId();
final boolean amqpLinkClosed = tenantConnectionManager.closeEndpoint(tenantId, endpoint);

if (amqpLinkClosed) {
log.info("closing AMQP connection for tenant [{}]", tenantId);
}
tenantConnectionManager.closeEndpoint(tenantId, endpoint)
.onSuccess(amqpLinkClosed -> {
if (amqpLinkClosed) {
log.info("closing AMQP connection for tenant [{}]", tenantId);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

/**
* Manages connections for multiple tenants.
* <p>
* <b>NB</b> The {@link #connect(String, Vertx, ClientConfigProperties)} method needs to be invoked before calling any
* of the other methods.
*/
public interface MultiTenantConnectionManager {

Expand All @@ -48,18 +51,22 @@ public interface MultiTenantConnectionManager {
*
* @param tenantId The tenant to which the endpoint belongs.
* @param mqttEndpoint The endpoint to be added.
* @return A future indicating the outcome of the operation. The future will succeed if the endpoint has been added
* successfully. Otherwise the future will fail with a failure message indicating the cause of the failure.
*/
void addEndpoint(String tenantId, MqttEndpoint mqttEndpoint);
Future<Void> addEndpoint(String tenantId, MqttEndpoint mqttEndpoint);

/**
* Closes the given MQTT endpoint and if there are no other open endpoints for this tenant, it closes the
* corresponding AMQP connection.
*
* @param tenantId The tenant to which the endpoint belongs.
* @param mqttEndpoint The endpoint to be closed.
* @return {@code true} if the AMQP connection and all endpoints have been closed.
* @return A future indicating the outcome of the operation. The future will succeed with a boolean that is
* {@code true} if the AMQP connection (and all MQTT endpoints) have been closed. If an error occurs, the
* future will fail with a failure message indicating the cause of the failure.
*/
boolean closeEndpoint(String tenantId, MqttEndpoint mqttEndpoint);
Future<Boolean> closeEndpoint(String tenantId, MqttEndpoint mqttEndpoint);

/**
* Closes all connections, MQTT connections as well as AMQP connections for all tenants.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Future<Void> connect(final String tenantId, final Vertx vertx, final Clie
connectionsPerTenant.computeIfAbsent(tenantId, k -> new TenantConnections(k, vertx, clientConfig).connect());

return getTenantConnections(tenantId)
.isConnected(clientConfig.getConnectTimeout())
.compose(tenantConnections -> tenantConnections.isConnected(clientConfig.getConnectTimeout()))
.onFailure(ex -> {
final TenantConnections failedTenant = connectionsPerTenant.remove(tenantId);
if (failedTenant != null) {
Expand All @@ -55,19 +55,25 @@ public Future<Void> connect(final String tenantId, final Vertx vertx, final Clie
}

@Override
public void addEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) {
getTenantConnections(tenantId).addEndpoint(mqttEndpoint);
public Future<Void> addEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) {
return getTenantConnections(tenantId)
.map(tenantConnections -> {
tenantConnections.addEndpoint(mqttEndpoint);
return null;
});
}

@Override
public boolean closeEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) {
public Future<Boolean> closeEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) {

final boolean amqpLinkClosed = getTenantConnections(tenantId).closeEndpoint(mqttEndpoint);
if (amqpLinkClosed) {
connectionsPerTenant.remove(tenantId);
}
return getTenantConnections(tenantId)
.map(tenantConnections -> tenantConnections.closeEndpoint(mqttEndpoint))
.onSuccess(amqpLinkClosed -> {
if (amqpLinkClosed) {
connectionsPerTenant.remove(tenantId);
}
});

return amqpLinkClosed;
}

@Override
Expand All @@ -78,64 +84,49 @@ public void closeAllTenants() {

@Override
public Future<TelemetrySender> getOrCreateTelemetrySender(final String tenantId) {
try {
return getAmqpAdapterClientFactory(tenantId).getOrCreateTelemetrySender();
} catch (Exception ex) {
return Future.failedFuture(ex);
}
return getAmqpAdapterClientFactory(tenantId).compose(AmqpAdapterClientFactory::getOrCreateTelemetrySender);
}

@Override
public Future<EventSender> getOrCreateEventSender(final String tenantId) {
try {
return getAmqpAdapterClientFactory(tenantId).getOrCreateEventSender();
} catch (Exception ex) {
return Future.failedFuture(ex);
}
return getAmqpAdapterClientFactory(tenantId).compose(AmqpAdapterClientFactory::getOrCreateEventSender);
}

@Override
public Future<CommandResponder> getOrCreateCommandResponseSender(final String tenantId) {
try {
return getAmqpAdapterClientFactory(tenantId).getOrCreateCommandResponseSender();
} catch (Exception ex) {
return Future.failedFuture(ex);
}
return getAmqpAdapterClientFactory(tenantId)
.compose(AmqpAdapterClientFactory::getOrCreateCommandResponseSender);
}

@Override
public Future<MessageConsumer> createDeviceSpecificCommandConsumer(final String tenantId, final String deviceId,
final Consumer<Message> messageHandler) {

try {
return getAmqpAdapterClientFactory(tenantId).createDeviceSpecificCommandConsumer(deviceId, messageHandler);
} catch (Exception ex) {
return Future.failedFuture(ex);
}
return getAmqpAdapterClientFactory(tenantId)
.compose(factory -> factory.createDeviceSpecificCommandConsumer(deviceId, messageHandler));

}

@Override
public Future<MessageConsumer> createCommandConsumer(final String tenantId,
final Consumer<Message> messageHandler) {

try {
return getAmqpAdapterClientFactory(tenantId).createCommandConsumer(messageHandler);
} catch (Exception ex) {
return Future.failedFuture(ex);
}
return getAmqpAdapterClientFactory(tenantId).compose(factory -> factory.createCommandConsumer(messageHandler));

}

private TenantConnections getTenantConnections(final String tenantId) throws IllegalArgumentException {
private Future<TenantConnections> getTenantConnections(final String tenantId) throws IllegalArgumentException {
final TenantConnections tenantConnections = connectionsPerTenant.get(tenantId);
if (tenantConnections == null) {
throw new IllegalArgumentException("tenant [" + tenantId + "] is not connected");
return Future.failedFuture("tenant [" + tenantId + "] is not connected");
} else {
return tenantConnections;
return Future.succeededFuture(tenantConnections);
}
}

private AmqpAdapterClientFactory getAmqpAdapterClientFactory(final String tenantId)
private Future<AmqpAdapterClientFactory> getAmqpAdapterClientFactory(final String tenantId)
throws IllegalStateException, IllegalArgumentException {
return getTenantConnections(tenantId).getAmqpAdapterClientFactory();
return getTenantConnections(tenantId)
.compose(tenantConnections -> Future.succeededFuture(tenantConnections.getAmqpAdapterClientFactory()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ public boolean closeEndpoint(final MqttEndpoint mqttEndpoint) {

/**
* Closes all MQTT endpoints and the AMQP connection.
*
* @throws IllegalStateException if this instance is already closed.
*/
public void closeAllConnections() {
log.info("closing all AMQP connections");
Expand All @@ -132,7 +130,7 @@ private void closeEndpointIfConnected(final MqttEndpoint mqttEndpoint) {
}

private void closeThisInstance() {
getAmqpAdapterClientFactory().disconnect();
amqpAdapterClientFactory.disconnect();
closed = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public void setUp() {

tenantConnectionManager = mock(MultiTenantConnectionManager.class);
when(tenantConnectionManager.connect(anyString(), any(), any())).thenReturn(Future.succeededFuture());
when(tenantConnectionManager.closeEndpoint(anyString(), any())).thenReturn(true);
when(tenantConnectionManager.addEndpoint(anyString(), any())).thenReturn(Future.succeededFuture());
when(tenantConnectionManager.closeEndpoint(anyString(), any())).thenReturn(Future.succeededFuture(true));

amqpClientConfig = new ClientConfigProperties();
final HonoConnection connection = mockHonoConnection(vertx, amqpClientConfig, protonSender);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.mockito.Mockito.when;

import org.eclipse.hono.config.ClientConfigProperties;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -57,7 +56,7 @@ public void amqpConnectionIsClosedWhenClosingLastEndpoint() {
connectionManager.connect(TENANT_ID, vertx, new ClientConfigProperties());
connectionManager.addEndpoint(TENANT_ID, endpoint);

assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint)).isTrue();
assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint).result()).isTrue();

}

Expand All @@ -71,7 +70,7 @@ public void amqpConnectionIsOpenWhenClosingEndpointThatIsNotTheLastOne() {
connectionManager.addEndpoint(TENANT_ID, endpoint);
connectionManager.addEndpoint(TENANT_ID, mock(MqttEndpoint.class));

assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint)).isFalse();
assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint).result()).isFalse();

}

Expand All @@ -85,26 +84,23 @@ public void addEndpointFailsIfInstanceIsClosed() {

connectionManager.closeAllTenants();

Assertions.assertThrows(IllegalArgumentException.class,
() -> connectionManager.addEndpoint(TENANT_ID, endpoint));
assertThat(connectionManager.addEndpoint(TENANT_ID, endpoint).failed()).isTrue();
}

/**
* Verifies that trying to add an endpoint without connecting the tenant first, throws an exception.
* Verifies that trying to add an endpoint without connecting the tenant first fails.
*/
@Test
public void addEndpointFailsIfNotConnected() {
Assertions.assertThrows(IllegalArgumentException.class,
() -> connectionManager.addEndpoint(TENANT_ID, endpoint));
assertThat(connectionManager.addEndpoint(TENANT_ID, endpoint).failed()).isTrue();
}

/**
* Verifies that trying to close an endpoint without connecting the tenant first, throws an exception.
* Verifies that trying to close an endpoint without connecting the tenant first fails.
*/
@Test
public void closeEndpointFailsIfNotConnected() {
Assertions.assertThrows(IllegalArgumentException.class,
() -> connectionManager.closeEndpoint(TENANT_ID, endpoint));
assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint).failed()).isTrue();
}

/**
Expand All @@ -114,20 +110,17 @@ public void closeEndpointFailsIfNotConnected() {
@Test
public void futureFailsIfNotConnected() {

assertThat(connectionManager.getOrCreateTelemetrySender(TENANT_ID).cause())
.isInstanceOf(IllegalArgumentException.class);
assertThat(connectionManager.getOrCreateTelemetrySender(TENANT_ID).failed()).isTrue();

assertThat(connectionManager.getOrCreateEventSender(TENANT_ID).cause())
.isInstanceOf(IllegalArgumentException.class);
assertThat(connectionManager.getOrCreateEventSender(TENANT_ID).failed()).isTrue();

assertThat(connectionManager.getOrCreateCommandResponseSender(TENANT_ID).cause())
.isInstanceOf(IllegalArgumentException.class);
assertThat(connectionManager.getOrCreateCommandResponseSender(TENANT_ID).failed()).isTrue();

assertThat(connectionManager.createDeviceSpecificCommandConsumer(TENANT_ID, "device-id", msg -> {
}).cause()).isInstanceOf(IllegalArgumentException.class);
}).failed()).isTrue();

assertThat(connectionManager.createCommandConsumer(TENANT_ID, msg -> {
}).cause()).isInstanceOf(IllegalArgumentException.class);
}).failed()).isTrue();

}

Expand Down

0 comments on commit ab79d77

Please sign in to comment.