Skip to content

Commit

Permalink
[eclipse-hono#5] Requested changes.
Browse files Browse the repository at this point in the history
Some minor changes like log statements and JavaDoc.
+ Refactoring of some methods in TenantConnections to return futures to indicate success or failure.

Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
  • Loading branch information
b-abel committed Sep 10, 2020
1 parent ab79d77 commit c446332
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ private void cleanupConnections(final MqttEndpoint endpoint,
tenantConnectionManager.closeEndpoint(tenantId, endpoint)
.onSuccess(amqpLinkClosed -> {
if (amqpLinkClosed) {
log.info("closing AMQP connection for tenant [{}]", tenantId);
log.info("closed AMQP connection for tenant [{}]", tenantId);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
public interface MultiTenantConnectionManager {

/**
* Connect to Hono's AMQP adapter with the given configuration.
* Connects to Hono's AMQP adapter with the given configuration.
*
* @param tenantId The tenant to connect.
* @param vertx The Vert.x instance to use for the connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ public class MultiTenantConnectionManagerImpl implements MultiTenantConnectionMa
@Override
public Future<Void> connect(final String tenantId, final Vertx vertx, final ClientConfigProperties clientConfig) {

connectionsPerTenant.computeIfAbsent(tenantId, k -> new TenantConnections(k, vertx, clientConfig).connect());
connectionsPerTenant.computeIfAbsent(tenantId, k -> {
final TenantConnections tenantConnections = new TenantConnections(k, vertx, clientConfig);
tenantConnections.connect();
return tenantConnections;
});

return getTenantConnections(tenantId)
.compose(tenantConnections -> tenantConnections.isConnected(clientConfig.getConnectTimeout()))
Expand All @@ -57,10 +61,7 @@ public Future<Void> connect(final String tenantId, final Vertx vertx, final Clie
@Override
public Future<Void> addEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) {
return getTenantConnections(tenantId)
.map(tenantConnections -> {
tenantConnections.addEndpoint(mqttEndpoint);
return null;
});
.compose(tenantConnections -> tenantConnections.addEndpoint(mqttEndpoint));
}

@Override
Expand Down Expand Up @@ -115,7 +116,7 @@ public Future<MessageConsumer> createCommandConsumer(final String tenantId,

}

private Future<TenantConnections> getTenantConnections(final String tenantId) throws IllegalArgumentException {
private Future<TenantConnections> getTenantConnections(final String tenantId) {
final TenantConnections tenantConnections = connectionsPerTenant.get(tenantId);
if (tenantConnections == null) {
return Future.failedFuture("tenant [" + tenantId + "] is not connected");
Expand All @@ -124,9 +125,7 @@ private Future<TenantConnections> getTenantConnections(final String tenantId) th
}
}

private Future<AmqpAdapterClientFactory> getAmqpAdapterClientFactory(final String tenantId)
throws IllegalStateException, IllegalArgumentException {
return getTenantConnections(tenantId)
.compose(tenantConnections -> Future.succeededFuture(tenantConnections.getAmqpAdapterClientFactory()));
private Future<AmqpAdapterClientFactory> getAmqpAdapterClientFactory(final String tenantId) {
return getTenantConnections(tenantId).compose(TenantConnections::getAmqpAdapterClientFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.List;

import org.eclipse.hono.client.ConnectionLifecycle;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory;
Expand Down Expand Up @@ -44,6 +45,7 @@ class TenantConnections {

private final AmqpAdapterClientFactory amqpAdapterClientFactory;
private final Logger log = LoggerFactory.getLogger(getClass());
private final String tenantId;

private boolean closed = false;

Expand All @@ -55,37 +57,40 @@ class TenantConnections {
* @param clientConfig The client configuration to be used by the HonoConnection.
*/
TenantConnections(final String tenantId, final Vertx vertx, final ClientConfigProperties clientConfig) {
this(AmqpAdapterClientFactory.create(HonoConnection.newConnection(vertx, clientConfig), tenantId));
this(AmqpAdapterClientFactory.create(HonoConnection.newConnection(vertx, clientConfig), tenantId), tenantId);
}

/**
* Creates a new instance for the given {@link AmqpAdapterClientFactory}.
* <p>
* <b>This constructor is for testing purposes only.</b>
*
* @param amqpAdapterClientFactory The AmqpAdapterClientFactory to use for creating AMQP clients.
* @param tenantId The ID of the tenant whose connections are to be managed
*/
TenantConnections(final AmqpAdapterClientFactory amqpAdapterClientFactory) {
TenantConnections(final AmqpAdapterClientFactory amqpAdapterClientFactory, final String tenantId) {
this.amqpAdapterClientFactory = amqpAdapterClientFactory;
this.tenantId = tenantId;
}

/**
* Opens a connection to Hono's AMQP protocol adapter for the tenant to be managed.
*
* @return This instance for fluent use.
* @throws IllegalStateException if this instance is already closed.
* @return A future indicating the outcome of the operation.
*/
public TenantConnections connect() {
getAmqpAdapterClientFactory().connect().onSuccess(con -> log.debug("Connected to AMQP adapter"));
return this;
public Future<HonoConnection> connect() {
return getAmqpAdapterClientFactory().compose(ConnectionLifecycle::connect)
.onSuccess(con -> log.debug("Connected to AMQP adapter"));
}

/**
* Adds an MQTT endpoint for the tenant.
*
* @param mqttEndpoint The endpoint to add.
* @return A future indicating the outcome of the operation.
*/
public void addEndpoint(final MqttEndpoint mqttEndpoint) {
checkNotClosed();
mqttEndpoints.add(mqttEndpoint);
public Future<Void> addEndpoint(final MqttEndpoint mqttEndpoint) {
return failIfClosed().onSuccess(v -> mqttEndpoints.add(mqttEndpoint));
}

/**
Expand All @@ -94,7 +99,6 @@ public void addEndpoint(final MqttEndpoint mqttEndpoint) {
*
* @param mqttEndpoint The endpoint to be closed.
* @return {@code true} if the AMQP connection has been closed.
* @throws IllegalStateException if this instance is already closed.
*/
public boolean closeEndpoint(final MqttEndpoint mqttEndpoint) {

Expand Down Expand Up @@ -139,27 +143,29 @@ private void closeThisInstance() {
*
* @param connectTimeout The maximum number of milliseconds to wait for an ongoing connection attempt to finish.
* @return A succeeded future if this connection is established. Otherwise, the future will be failed with a
* {@link ServerErrorException}.
* @throws IllegalStateException if this instance is already closed.
* {@link ServerErrorException}, or an {@link IllegalStateException} if this instance is already closed.
*/
public Future<Void> isConnected(final long connectTimeout) {
return getAmqpAdapterClientFactory().isConnected(connectTimeout);
return getAmqpAdapterClientFactory().compose(f -> f.isConnected(connectTimeout));
}

/**
* Returns the AmqpAdapterClientFactory for the tenant.
*
* @return The AmqpAdapterClientFactory.
* @throws IllegalStateException if this instance is already closed.
* @return A future containing the AmqpAdapterClientFactory, or, if this instance is already closed, a failed
* future.
*/
public AmqpAdapterClientFactory getAmqpAdapterClientFactory() throws IllegalStateException {
checkNotClosed();
return amqpAdapterClientFactory;
public Future<AmqpAdapterClientFactory> getAmqpAdapterClientFactory() {
return failIfClosed().map(amqpAdapterClientFactory);
}

private void checkNotClosed() throws IllegalStateException {
private Future<Void> failIfClosed() {
if (closed) {
throw new IllegalStateException("all connections for this tenant are already closed");
final Exception ex = new IllegalStateException("connections for this tenant are already closed");
log.warn("This should not happen", ex);
return Future.failedFuture(ex);
} else {
return Future.succeededFuture();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
package org.eclipse.hono.gateway.sdk.mqtt2amqp;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -44,20 +43,10 @@ public class TenantConnectionsTest {
public void setUp() {
amqpAdapterClientFactory = mock(AmqpAdapterClientFactory.class);

tenantConnections = new TenantConnections(amqpAdapterClientFactory);
tenantConnections = new TenantConnections(amqpAdapterClientFactory, "a-tenant-id");
endpoint = mock(MqttEndpoint.class);
}

/**
* Verifies that the connect method returns the instance.
*/
@Test
public void connectReturnsTheInstance() {
when(amqpAdapterClientFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class)));

assertThat(tenantConnections.connect()).isEqualTo(tenantConnections);
}

/**
* Verifies that adding an endpoint works.
*/
Expand Down Expand Up @@ -90,7 +79,7 @@ public void instanceIsClosedWhenClosingLastEndpoint() {

tenantConnections.closeEndpoint(endpoint);

Assertions.assertThrows(IllegalStateException.class, () -> tenantConnections.getAmqpAdapterClientFactory());
assertThat(tenantConnections.getAmqpAdapterClientFactory().failed()).isTrue();
}

/**
Expand All @@ -103,7 +92,7 @@ public void instanceIsOpenWhenClosingEndpointThatIsNotTheLastOne() {

tenantConnections.closeEndpoint(endpoint);

assertThat(tenantConnections.getAmqpAdapterClientFactory()).isNotNull();
assertThat(tenantConnections.getAmqpAdapterClientFactory().succeeded()).isTrue();
}

/**
Expand All @@ -116,16 +105,30 @@ public void instanceIsClosedWhenInvokingClose() {

tenantConnections.closeAllConnections();

Assertions.assertThrows(IllegalStateException.class, () -> tenantConnections.getAmqpAdapterClientFactory());
assertThat(tenantConnections.getAmqpAdapterClientFactory().failed()).isTrue();
}

/**
* Verifies that the isConnected() method delegates the check to the client factory.
*/
@Test
public void isConnectedDelegatesToClientFactory() {
when(amqpAdapterClientFactory.isConnected(anyLong())).thenReturn(Future.succeededFuture());

tenantConnections.isConnected(5L);
verify(amqpAdapterClientFactory).isConnected(eq(5L));
}

/**
* Verifies that the connect() method delegates the call to the client factory.
*/
@Test
public void connectDelegatesToClientFactory() {
when(amqpAdapterClientFactory.connect()).thenReturn(Future.succeededFuture());

tenantConnections.connect();
verify(amqpAdapterClientFactory).connect();

}

}

0 comments on commit c446332

Please sign in to comment.