From a50dc6512270478e16e7ab88a10edc6eee0922ba Mon Sep 17 00:00:00 2001 From: Kai Hudalla Date: Tue, 19 Nov 2024 15:51:25 +0100 Subject: [PATCH] [#2955] Fix MQTT5 Connect Reason Codes MQTT5 defines new reason codes to be included in CONNACK packets when connection establishment fails. The abstract adapter base class has been changed accordingly. Also added integration tests based on HiveMQ client for testing connection establishment. --- ...AbstractVertxBasedMqttProtocolAdapter.java | 36 +- bom/pom.xml | 6 + legal/src/main/resources/legal/DEPENDENCIES | 60 +- tests/pom.xml | 5 + .../eclipse/hono/tests/mqtt/MqttTestBase.java | 6 +- .../hono/tests/mqtt5/MqttConnectionIT.java | 884 ++++++++++++++++++ .../hono/tests/mqtt5/MqttTestBase.java | 285 ++++++ 7 files changed, 1234 insertions(+), 48 deletions(-) create mode 100644 tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttConnectionIT.java create mode 100644 tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttTestBase.java diff --git a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java index a7c611a19e..d609defb1c 100644 --- a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java +++ b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java @@ -90,6 +90,7 @@ import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttVersion; import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.log.Fields; @@ -518,7 +519,7 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) { log.debug("rejecting connection request from client [clientId: {}], cause:", endpoint.clientIdentifier(), t); - final MqttConnectReturnCode code = getConnectReturnCode(t); + final MqttConnectReturnCode code = getConnectReturnCode(t, endpoint); rejectConnectionRequest(endpoint, code, span); TracingHelper.logError(span, t); } @@ -1106,28 +1107,33 @@ final MqttDeviceEndpoint createMqttDeviceEndpoint( return mqttDeviceEndpoint; } - private static MqttConnectReturnCode getConnectReturnCode(final Throwable e) { + private static MqttConnectReturnCode getConnectReturnCode(final Throwable e, final MqttEndpoint endpoint) { - if (e instanceof MqttConnectionException) { - return ((MqttConnectionException) e).code(); + final boolean isPreMqtt5 = ((int) MqttVersion.MQTT_5.protocolLevel()) > endpoint.protocolVersion(); + if (e instanceof MqttConnectionException connectionException) { + return connectionException.code(); + } else if (e instanceof AdapterConnectionsExceededException) { + return isPreMqtt5 ? MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE + : MqttConnectReturnCode.CONNECTION_REFUSED_QUOTA_EXCEEDED; } else if (e instanceof AuthorizationException) { - if (e instanceof AdapterConnectionsExceededException) { - return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE; - } else { - return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; - } - } else if (e instanceof ServiceInvocationException) { - switch (((ServiceInvocationException) e).getErrorCode()) { + return isPreMqtt5 ? MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED + : MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5; + } else if (e instanceof ServiceInvocationException exception) { + switch (exception.getErrorCode()) { case HttpURLConnection.HTTP_UNAUTHORIZED: case HttpURLConnection.HTTP_NOT_FOUND: - return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD; + return isPreMqtt5 ? MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD + : MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD; case HttpURLConnection.HTTP_UNAVAILABLE: - return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE; + return isPreMqtt5 ? MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE + : MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5; default: - return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; + return isPreMqtt5 ? MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED + : MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5; } } else { - return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; + return isPreMqtt5 ? MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED + : MqttConnectReturnCode.CONNECTION_REFUSED_UNSPECIFIED_ERROR; } } diff --git a/bom/pom.xml b/bom/pom.xml index 423fbf01d1..7339b2e7ad 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -595,6 +595,12 @@ quarkus.vertx.max-event-loop-execute-time=${max.event-loop.execute-time:20000} + + com.hivemq + hivemq-mqtt-client + 1.3.3 + test + org.eclipse.hono core-test-utils diff --git a/legal/src/main/resources/legal/DEPENDENCIES b/legal/src/main/resources/legal/DEPENDENCIES index 4f2258ec24..34cb06ef75 100644 --- a/legal/src/main/resources/legal/DEPENDENCIES +++ b/legal/src/main/resources/legal/DEPENDENCIES @@ -1,6 +1,6 @@ maven/mavencentral/biz.paluch.logging/logstash-gelf/1.15.1, MIT, approved, clearlydefined -maven/mavencentral/ch.qos.logback/logback-classic/1.5.6, EPL-1.0 AND LGPL-2.1-only, approved, #13282 -maven/mavencentral/ch.qos.logback/logback-core/1.5.6, EPL-1.0 AND LGPL-2.1-only, approved, #13283 +maven/mavencentral/ch.qos.logback/logback-classic/1.5.6, EPL-1.0 AND LGPL-2.1-only, approved, #15279 +maven/mavencentral/ch.qos.logback/logback-core/1.5.6, EPL-1.0 AND LGPL-2.1-only, approved, #15210 maven/mavencentral/com.aayushatharva.brotli4j/brotli4j/1.16.0, Apache-2.0, approved, clearlydefined maven/mavencentral/com.aayushatharva.brotli4j/native-linux-x86_64/1.16.0, Apache-2.0, approved, clearlydefined maven/mavencentral/com.aayushatharva.brotli4j/service/1.16.0, Apache-2.0, approved, clearlydefined @@ -27,7 +27,7 @@ maven/mavencentral/com.google.auth/google-auth-library-oauth2-http/1.23.0, BSD-3 maven/mavencentral/com.google.auto.value/auto-value-annotations/1.10.4, Apache-2.0, approved, clearlydefined maven/mavencentral/com.google.cloud/google-cloud-core/2.35.0, Apache-2.0, approved, clearlydefined maven/mavencentral/com.google.cloud/google-cloud-pubsub/1.127.1, Apache-2.0, approved, clearlydefined -maven/mavencentral/com.google.code.findbugs/jsr305/3.0.2, Apache-2.0, approved, #20 +maven/mavencentral/com.google.code.findbugs/jsr305/3.0.2, Apache-2.0 and CC-BY-2.5, approved, #15220 maven/mavencentral/com.google.code.gson/gson/2.10.1, Apache-2.0, approved, #6159 maven/mavencentral/com.googlecode.juniversalchardet/juniversalchardet/1.0.3, MPL-1.1, approved, CQ10305 maven/mavencentral/com.google.errorprone/error_prone_annotations/2.24.0, Apache-2.0, approved, #12448 @@ -45,7 +45,7 @@ maven/mavencentral/com.mchange/c3p0/0.9.5.5, LGPL-2.1-only OR EPL-1.0, approved, maven/mavencentral/com.mchange/mchange-commons-java/0.2.19, LGPL-2.1-only OR EPL-1.0, approved, clearlydefined maven/mavencentral/commons-codec/commons-codec/1.16.1, Apache-2.0 AND (Apache-2.0 AND BSD-3-Clause), approved, #9157 maven/mavencentral/commons-io/commons-io/2.15.1, Apache-2.0, approved, #11244 -maven/mavencentral/com.squareup.okhttp3/okhttp/4.12.0, Apache-2.0, approved, #11156 +maven/mavencentral/com.squareup.okhttp3/okhttp/4.12.0, Apache-2.0, approved, #15227 maven/mavencentral/com.squareup.okio/okio/3.6.0, Apache-2.0, approved, #11155 maven/mavencentral/com.squareup.okio/okio-jvm/3.6.0, Apache-2.0, approved, #11158 maven/mavencentral/com.squareup/protoparser/4.0.3, Apache-2.0, approved, clearlydefined @@ -249,29 +249,29 @@ maven/mavencentral/io.smallrye/smallrye-fault-tolerance-vertx/6.2.6, Apache-2.0, maven/mavencentral/io.smallrye/smallrye-health/4.1.0, Apache-2.0, approved, clearlydefined maven/mavencentral/io.smallrye/smallrye-health-api/4.1.0, Apache-2.0, approved, clearlydefined maven/mavencentral/io.smallrye/smallrye-health-provided-checks/4.1.0, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-auth-common/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-auth-jdbc/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-auth-mongo/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-bridge-common/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-codegen/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-core/4.5.7, Apache-2.0 AND EPL-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-grpc/4.5.7, Apache-2.0 AND EPL-1.0 AND Apache-2.0, approved, #14668 -maven/mavencentral/io.vertx/vertx-grpc-client/4.5.7, EPL-2.0 OR Apache-2.0, approved, #14667 -maven/mavencentral/io.vertx/vertx-grpc-common/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-grpc-server/4.5.7, EPL-2.0 OR Apache-2.0, approved, #14669 -maven/mavencentral/io.vertx/vertx-health-check/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-jdbc-client/4.5.7, Apache-2.0 AND EPL-1.0 AND (EPL-2.0 OR Apache-2.0) AND Apache-2.0, approved, #14671 -maven/mavencentral/io.vertx/vertx-junit5/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-kafka-client/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-micrometer-metrics/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-mongo-client/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-mqtt/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-proton/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-sql-client/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-uri-template/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-web/4.5.7, Apache-2.0 AND EPL-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-web-client/4.5.7, Apache-2.0, approved, clearlydefined -maven/mavencentral/io.vertx/vertx-web-common/4.5.7, Apache-2.0, approved, clearlydefined +maven/mavencentral/io.vertx/vertx-auth-common/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-auth-jdbc/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-auth-mongo/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-bridge-common/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-codegen/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-core/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-grpc/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-grpc-client/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-grpc-common/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-grpc-server/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-health-check/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-jdbc-client/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-junit5/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-kafka-client/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-micrometer-metrics/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-mongo-client/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-mqtt/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-proton/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-sql-client/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-uri-template/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-web/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-web-client/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx +maven/mavencentral/io.vertx/vertx-web-common/4.5.7, EPL-2.0 OR Apache-2.0, approved, rt.vertx maven/mavencentral/jakarta.activation/jakarta.activation-api/2.1.2, EPL-2.0 OR BSD-3-Clause OR GPL-2.0-only with Classpath-exception-2.0, approved, ee4j.jaf maven/mavencentral/jakarta.annotation/jakarta.annotation-api/2.1.1, EPL-2.0 OR GPL-2.0-only with Classpath-exception-2.0, approved, ee4j.ca maven/mavencentral/jakarta.el/jakarta.el-api/5.0.1, EPL-2.0 OR GPL-2.0-only with Classpath-exception-2.0, approved, ee4j.el @@ -288,7 +288,7 @@ maven/mavencentral/net.java.dev.jna/jna/5.8.0, Apache-2.0 OR LGPL-2.1-or-later, maven/mavencentral/org.apache.commons/commons-collections4/4.4, Apache-2.0, approved, clearlydefined maven/mavencentral/org.apache.commons/commons-compress/1.26.1, Apache-2.0 AND (Apache-2.0 AND BSD-3-Clause), approved, #13288 maven/mavencentral/org.apache.commons/commons-lang3/3.14.0, Apache-2.0, approved, #11677 -maven/mavencentral/org.apache.httpcomponents/httpclient/4.5.14, Apache-2.0 AND LicenseRef-Public-Domain, approved, CQ23527 +maven/mavencentral/org.apache.httpcomponents/httpclient/4.5.14, Apache-2.0, approved, #15248 maven/mavencentral/org.apache.httpcomponents/httpcore/4.4.16, Apache-2.0, approved, CQ23528 maven/mavencentral/org.apache.kafka/kafka-clients/3.6.1, Apache-2.0 AND (Apache-2.0 AND MIT) AND (Apache-2.0 AND BSD-3-Clause), approved, #11084 maven/mavencentral/org.apache.logging.log4j/log4j-api/2.22.1, Apache-2.0, approved, #12576 @@ -312,7 +312,7 @@ maven/mavencentral/org.eclipse.parsson/parsson/1.1.5, EPL-2.0, approved, ee4j.pa maven/mavencentral/org.fusesource.jansi/jansi/2.4.0, Apache-2.0, approved, clearlydefined maven/mavencentral/org.graalvm.sdk/nativeimage/23.1.2, UPL-1.0, approved, #10921 maven/mavencentral/org.graalvm.sdk/word/23.1.2, UPL-1.0, approved, #10917 -maven/mavencentral/org.hdrhistogram/HdrHistogram/2.1.12, BSD-2-Clause OR LicenseRef-Public-Domain, approved, CQ13192 +maven/mavencentral/org.hdrhistogram/HdrHistogram/2.1.12, CC0-1.0, approved, #15259 maven/mavencentral/org.infinispan/infinispan-client-hotrod-jakarta/14.0.27.Final, Apache-2.0, approved, #10936 maven/mavencentral/org.infinispan/infinispan-commons-jakarta/14.0.27.Final, Apache-2.0 AND LicenseRef-Public-Domain, approved, #10937 maven/mavencentral/org.infinispan/infinispan-core-jakarta/14.0.27.Final, Apache-2.0, approved, #10938 @@ -338,7 +338,7 @@ maven/mavencentral/org.jetbrains.kotlin/kotlin-stdlib-jdk7/1.9.22, Apache-2.0, a maven/mavencentral/org.jetbrains.kotlin/kotlin-stdlib-jdk8/1.9.22, Apache-2.0, approved, #14185 maven/mavencentral/org.jgroups/jgroups/5.2.23.Final, Apache-2.0, approved, clearlydefined maven/mavencentral/org.jline/jline/3.26.1, BSD-3-Clause AND Apache-2.0, approved, #14872 -maven/mavencentral/org.latencyutils/LatencyUtils/2.0.3, BSD-2-Clause, approved, CQ17408 +maven/mavencentral/org.latencyutils/LatencyUtils/2.0.3, CC0-1.0, approved, #15280 maven/mavencentral/org.locationtech.jts/jts-core/1.17.0, EPL-2.0, approved, locationtech.jts maven/mavencentral/org.lz4/lz4-java/1.8.0, Apache-2.0, approved, clearlydefined maven/mavencentral/org.mongodb/bson/4.11.1, Apache-2.0, approved, clearlydefined diff --git a/tests/pom.xml b/tests/pom.xml index 6592ae6c7f..6a218d582a 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -319,6 +319,11 @@ vertx-mqtt test + + com.hivemq + hivemq-mqtt-client + test + org.eclipse.californium californium-core diff --git a/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java b/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java index d1c2775edf..b6039291c3 100644 --- a/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java +++ b/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -36,7 +36,7 @@ import io.vertx.mqtt.messages.MqttConnAckMessage; /** - * Base class for MQTT adapter integration tests. + * Base class for MQTT adapter integration tests using MQTT 3.1.1. * */ public abstract class MqttTestBase { @@ -70,7 +70,7 @@ public abstract class MqttTestBase { protected Context context; /** - * Creates default AMQP client options. + * Creates default MQTT client options. */ @BeforeAll public static void init() { diff --git a/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttConnectionIT.java b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttConnectionIT.java new file mode 100644 index 0000000000..206f6ed312 --- /dev/null +++ b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttConnectionIT.java @@ -0,0 +1,884 @@ +/******************************************************************************* + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.tests.mqtt5; + +import static com.google.common.truth.Truth.assertThat; + +import java.net.HttpURLConnection; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.cert.X509Certificate; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import javax.security.auth.x500.X500Principal; + +import org.eclipse.hono.service.management.credentials.Credentials; +import org.eclipse.hono.service.management.credentials.PasswordCredential; +import org.eclipse.hono.service.management.credentials.X509CertificateCredential; +import org.eclipse.hono.service.management.credentials.X509CertificateSecret; +import org.eclipse.hono.service.management.device.Device; +import org.eclipse.hono.service.management.tenant.Tenant; +import org.eclipse.hono.tests.EnabledIfDnsRebindingIsSupported; +import org.eclipse.hono.tests.EnabledIfProtocolAdaptersAreRunning; +import org.eclipse.hono.tests.EnabledIfRegistrySupportsFeatures; +import org.eclipse.hono.tests.IntegrationTestSupport; +import org.eclipse.hono.tests.Tenants; +import org.eclipse.hono.util.Adapter; +import org.eclipse.hono.util.Constants; +import org.eclipse.hono.util.CredentialsConstants; +import org.eclipse.hono.util.IdentityTemplate; +import org.eclipse.hono.util.RegistrationConstants; +import org.eclipse.hono.util.RegistryManagementConstants; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier; +import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext; +import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener; +import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode; + +import io.jsonwebtoken.Jwts; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.SelfSignedCertificate; +import io.vertx.junit5.Timeout; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +/** + * Integration tests for checking connection to the MQTT adapter. + * + */ +@ExtendWith(VertxExtension.class) +@Timeout(timeUnit = TimeUnit.SECONDS, value = 15) +@EnabledIfProtocolAdaptersAreRunning(mqttAdapter = true) +public class MqttConnectionIT extends MqttTestBase { + + private SelfSignedCertificate deviceCert; + private String tenantId; + private String deviceId; + private String password; + + /** + * Sets up the fixture. + */ + @BeforeEach + public void setUp() { + tenantId = helper.getRandomTenantId(); + deviceId = helper.getRandomDeviceId(tenantId); + password = "secret"; + deviceCert = SelfSignedCertificate.create(UUID.randomUUID().toString()); + } + + /** + * Verifies that the adapter opens a connection to registered devices with credentials. + * + * @param tlsVersion The TLS protocol version to use for connecting to the adapter. + * @param ctx The test context + */ + @ParameterizedTest(name = IntegrationTestSupport.PARAMETERIZED_TEST_NAME_PATTERN) + @ValueSource(strings = { IntegrationTestSupport.TLS_VERSION_1_2, IntegrationTestSupport.TLS_VERSION_1_3 }) + public void testConnectSucceedsForRegisteredDevice(final String tlsVersion, final VertxTestContext ctx) { + + final Tenant tenant = new Tenant(); + + helper.registry + .addDeviceForTenant(tenantId, tenant, deviceId, password) + .compose(ok -> connectToAdapter( + tlsVersion, + IntegrationTestSupport.getUsername(deviceId, tenantId), + password, + null, + null)) + .onComplete(ctx.succeeding(conAckMsg -> { + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter opens a connection to a registered device that authenticates + * using a JSON Web Token. + * + * @param ctx The test context + * @throws NoSuchAlgorithmException if the JVM does not support ECC cryptography. + */ + @Test + public void testConnectJwtSucceedsForRegisteredDevice(final VertxTestContext ctx) throws NoSuchAlgorithmException { + + final var generator = KeyPairGenerator.getInstance(CredentialsConstants.EC_ALG); + final var keyPair = generator.generateKeyPair(); + final var rpkCredential = Credentials.createRPKCredential(deviceId, keyPair.getPublic()); + + final var jws = Jwts.builder() + .header().type("JWT") + .and() + .audience().add(CredentialsConstants.AUDIENCE_HONO_ADAPTER) + .and() + .issuer(deviceId) + .subject(deviceId) + .claim(CredentialsConstants.CLAIM_TENANT_ID, tenantId) + .issuedAt(Date.from(Instant.now())) + .expiration(Date.from(Instant.now().plus(Duration.ofMinutes(10)))) + .signWith(keyPair.getPrivate()) + .compact(); + + helper.registry.addTenant(tenantId) + .compose(res -> helper.registry.registerDevice(tenantId, deviceId)) + .compose(res -> helper.registry.addCredentials(tenantId, deviceId, Set.of(rpkCredential))) + .compose(ok -> connectToAdapter("ignored", jws)) + .onComplete(ctx.succeeding(conAckMsg -> { + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter opens a connection to a registered device that authenticates + * using a Google IoT Core style JSON Web Token in conjunction with an MQTT connection + * identifier that contains the tenant and authentication ID. + * + * @param ctx The test context + * @throws NoSuchAlgorithmException if the JVM does not support ECC cryptography. + */ + @Test + public void testConnectGoogleIoTCoreJwtSucceedsForRegisteredDevice(final VertxTestContext ctx) throws NoSuchAlgorithmException { + + final var generator = KeyPairGenerator.getInstance(CredentialsConstants.EC_ALG); + final var keyPair = generator.generateKeyPair(); + final var rpkCredential = Credentials.createRPKCredential(deviceId, keyPair.getPublic()); + + final var jws = Jwts.builder() + .header().type("JWT") + .and() + .issuedAt(Date.from(Instant.now())) + .expiration(Date.from(Instant.now().plus(Duration.ofMinutes(10)))) + .signWith(keyPair.getPrivate()) + .compact(); + final var clientId = MqttClientIdentifier.of("tenants/%s/devices/%s".formatted(tenantId, deviceId)); + + helper.registry.addTenant(tenantId) + .compose(res -> helper.registry.registerDevice(tenantId, deviceId)) + .compose(res -> helper.registry.addCredentials(tenantId, deviceId, Set.of(rpkCredential))) + .compose(ok -> connectToAdapter(IntegrationTestSupport.TLS_VERSION_1_2, "ignored", jws, clientId, null)) + .onComplete(ctx.succeeding(conAckMsg -> { + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that an attempt to open a connection using a valid X.509 client certificate succeeds. + * + * @param ctx The test context + */ + @Test + public void testConnectX509SucceedsForRegisteredDevice(final VertxTestContext ctx) { + + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert); + }) + .compose(ok -> connectToAdapter(deviceCert)) + .onComplete(ctx.succeeding(conAckMsg -> { + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that an attempt to open a connection using a valid X.509 client certificate succeeds + * for a device belonging to a tenant that uses the same trust anchor as another tenant. + * + * @param ctx The test context + */ + @Test + @EnabledIfDnsRebindingIsSupported + @EnabledIfRegistrySupportsFeatures(trustAnchorGroups = true) + public void testConnectX509SucceedsUsingSni(final VertxTestContext ctx) { + + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + // GIVEN two tenants belonging to the same trust anchor group + final var tenant = Tenants.createTenantForTrustAnchor(cert) + .setTrustAnchorGroup("test-group"); + // which both use the same CA + return helper.registry.addTenant(helper.getRandomTenantId(), tenant) + // and a device belonging to one of the tenants + .compose(ok -> helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert)); + }) + // WHEN the device connects to the adapter including its tenant ID in the host name + .compose(ok -> connectToAdapter( + deviceCert, + IntegrationTestSupport.getSniHostname(IntegrationTestSupport.MQTT_HOST, tenantId))) + .onComplete(ctx.succeeding(conAckMsg -> { + // THEN the connection attempt succeeds + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that an attempt to open a connection using a valid X.509 client certificate succeeds + * for a device belonging to a tenant with a tenant alias. + * + * @param ctx The test context + */ + @Test + @EnabledIfDnsRebindingIsSupported + @EnabledIfRegistrySupportsFeatures(trustAnchorGroups = true, tenantAlias = true) + public void testConnectX509SucceedsUsingSniWithTenantAlias(final VertxTestContext ctx) { + + helper.getCertificate(deviceCert.certificatePath()) + // GIVEN two tenants belonging to the same trust anchor group + // which both use the same CA + .compose(cert -> helper.registry.addTenant( + helper.getRandomTenantId(), + Tenants.createTenantForTrustAnchor(cert).setTrustAnchorGroup("test-group")) + .map(cert)) + // and a device belonging to one of the tenants which has an alias configured + .compose(cert -> helper.registry.addDeviceForTenant( + tenantId, + Tenants.createTenantForTrustAnchor(cert) + .setTrustAnchorGroup("test-group") + .setAlias("test-alias"), + deviceId, + cert)) + // WHEN the device connects to the adapter including the tenant alias in the host name + .compose(ok -> connectToAdapter( + deviceCert, + IntegrationTestSupport.getSniHostname(IntegrationTestSupport.MQTT_HOST, "test-alias"))) + .onComplete(ctx.succeeding(conAckMsg -> { + // THEN the connection attempt succeeds + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that an attempt to open a connection using a valid X.509 client certificate fails + * for a device belonging to a tenant using a non-existing tenant alias. + * + * @param ctx The test context + */ + @Test + @EnabledIfDnsRebindingIsSupported + @EnabledIfRegistrySupportsFeatures(trustAnchorGroups = true, tenantAlias = true) + public void testConnectX509FailsUsingSniWithNonExistingTenantAlias(final VertxTestContext ctx) { + + helper.getCertificate(deviceCert.certificatePath()) + // GIVEN two tenants belonging to the same trust anchor group + // which both use the same CA + .compose(cert -> helper.registry.addTenant( + helper.getRandomTenantId(), + Tenants.createTenantForTrustAnchor(cert).setTrustAnchorGroup("test-group")) + .map(cert)) + // and a device belonging to one of the tenants which has an alias configured + .compose(cert -> helper.registry.addDeviceForTenant( + tenantId, + Tenants.createTenantForTrustAnchor(cert) + .setTrustAnchorGroup("test-group") + .setAlias("test-alias"), + deviceId, + cert)) + // WHEN the device connects to the adapter including a wrong tenant alias in the host name + .compose(ok -> connectToAdapter( + deviceCert, + IntegrationTestSupport.getSniHostname(IntegrationTestSupport.MQTT_HOST, "wrong-alias"))) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter opens a connection if auto-provisioning is enabled for the device certificate, and it + * is not configured auto-provisioning-device-id template and auth-id template. + * + * @param ctx The test context + */ + @Test + public void testConnectSucceedsWithAutoProvisioningWithoutTemplate(final VertxTestContext ctx) { + + final Promise autoProvisionedDeviceId = Promise.promise(); + + final IdentityTemplate defaultTemplate = new IdentityTemplate( + RegistryManagementConstants.PLACEHOLDER_SUBJECT_DN); + + final Future certTracker = helper.getCertificate(deviceCert.certificatePath()); + final Future subjectDNTracker = certTracker + .map(cert -> cert.getSubjectX500Principal().getName(X500Principal.RFC2253)); + + helper.createAutoProvisioningNotificationConsumer(ctx, autoProvisionedDeviceId, tenantId) + .compose(ok -> certTracker) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + tenant.getTrustedCertificateAuthorities().get(0).setAutoProvisioningEnabled(true); + return helper.registry.addTenant(tenantId, tenant); + }) + .compose(ok -> connectToAdapter(deviceCert)) + .compose(ok -> autoProvisionedDeviceId.future()) + .compose(deviceId -> { + // verify the device ID is not generated by subject-dn template + ctx.verify( + () -> assertThat(deviceId).isNotEqualTo(defaultTemplate.apply(subjectDNTracker.result()))); + return helper.registry.getRegistrationInfo(tenantId, deviceId); + }) + .compose(registrationResult -> { + ctx.verify(() -> { + final var infoRegistration = registrationResult.bodyAsJsonObject(); + IntegrationTestSupport.assertDeviceStatusProperties( + infoRegistration.getJsonObject(RegistryManagementConstants.FIELD_STATUS), + true); + }); + return helper.registry.getCredentials(tenantId, autoProvisionedDeviceId.future().result()); + }) + .onComplete(ctx.succeeding(credentialsResult -> { + ctx.verify(() -> { + final var infoCredentials = credentialsResult.bodyAsJsonArray(); + // verify the auth ID is generated by subject-dn template + assertThat(infoCredentials.getJsonObject(0) + .getString(RegistryManagementConstants.FIELD_AUTH_ID)) + .isEqualTo(defaultTemplate.apply(subjectDNTracker.result())); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter opens a connection if auto-provisioning is enabled for the device certificate, and it + * is configured auto-provisioning-device-id template and auth-id template. + * + * @param ctx The test context + */ + @Test + public void testConnectSucceedsWithAutoProvisioningWithTemplate(final VertxTestContext ctx) { + + final Promise autoProvisionedDeviceId = Promise.promise(); + + final IdentityTemplate deviceIdTemplate = new IdentityTemplate("{{subject-dn}}"); + final IdentityTemplate authIdTemplate = new IdentityTemplate("{{subject-cn}}"); + + final Future certTracker = helper.getCertificate(deviceCert.certificatePath()); + final Future subjectDNTracker = certTracker + .map(cert -> cert.getSubjectX500Principal().getName(X500Principal.RFC2253)); + + helper.createAutoProvisioningNotificationConsumer(ctx, autoProvisionedDeviceId, tenantId) + .compose(ok -> certTracker) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + tenant.getTrustedCertificateAuthorities().get(0) + .setAutoProvisioningEnabled(true) + .setAutoProvisioningDeviceIdTemplate(deviceIdTemplate.toString()) + .setAuthIdTemplate(authIdTemplate.toString()); + return helper.registry.addTenant(tenantId, tenant); + }) + .compose(ok -> connectToAdapter(deviceCert)) + .compose(ok -> autoProvisionedDeviceId.future()) + .compose(deviceId -> { + // verify the device ID is generated by auto-provisioning-device-id template + ctx.verify(() -> assertThat(deviceId).isEqualTo(deviceIdTemplate.apply(subjectDNTracker.result()))); + return helper.registry.getRegistrationInfo(tenantId, deviceId); + }) + .compose(registrationResult -> { + ctx.verify(() -> { + final var infoRegistration = registrationResult.bodyAsJsonObject(); + IntegrationTestSupport.assertDeviceStatusProperties( + infoRegistration.getJsonObject(RegistryManagementConstants.FIELD_STATUS), + true); + }); + final var deviceId = deviceIdTemplate.apply(subjectDNTracker.result()); + return helper.registry.getCredentials(tenantId, deviceId); + }) + .onComplete(ctx.succeeding(credentialsResult -> { + ctx.verify(() -> { + final var infoCredentials = credentialsResult.bodyAsJsonArray(); + // verify the auth ID is generated by auth-id template + assertThat(infoCredentials.getJsonObject(0) + .getString(RegistryManagementConstants.FIELD_AUTH_ID)) + .isEqualTo(authIdTemplate.apply(subjectDNTracker.result())); + }); + ctx.completeNow(); + + })); + } + + /** + * Verifies that the adapter rejects connection attempts from an unknown device for which auto-provisioning is + * disabled. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsIfAutoProvisioningIsDisabled(final VertxTestContext ctx) { + + // GIVEN a tenant configured with a trust anchor that does not allow auto-provisioning + // WHEN an unknown device tries to connect + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + tenant.getTrustedCertificateAuthorities().get(0).setAutoProvisioningEnabled(false); + return helper.registry.addTenant(tenantId, tenant); + }) + // WHEN a unknown device tries to connect to the adapter + // using a client certificate with the trust anchor registered for the device's tenant + .compose(ok -> connectToAdapter(deviceCert)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from unknown devices + * for which neither registration information nor credentials are on record. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForNonExistingDevice(final VertxTestContext ctx) { + + // GIVEN an adapter + // WHEN an unknown device tries to connect + connectToAdapter(IntegrationTestSupport.getUsername("non-existing", Constants.DEFAULT_TENANT), "secret") + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from unknown devices + * trying to authenticate using a client certificate but for which neither + * registration information nor credentials are on record. + * + * @param ctx The test context + */ + @Test + public void testConnectX509FailsForNonExistingDevice(final VertxTestContext ctx) { + + // GIVEN an adapter + // WHEN an unknown device tries to connect + connectToAdapter(deviceCert) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices + * using wrong credentials. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForWrongCredentials(final VertxTestContext ctx) { + + // GIVEN a registered device + final Tenant tenant = new Tenant(); + + helper.registry + .addDeviceForTenant(tenantId, tenant, deviceId, password) + // WHEN the device tries to connect using a wrong password + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), "wrong password")) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices + * using credentials that contain a non-existing tenant. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForNonExistingTenant(final VertxTestContext ctx) { + + // GIVEN a registered device + final Tenant tenant = new Tenant(); + + helper.registry + .addDeviceForTenant(tenantId, tenant, deviceId, password) + // WHEN a device of a non-existing tenant tries to connect + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, "nonExistingTenant"), "secret")) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices using a client certificate with an unknown + * subject DN. + * + * @param ctx The test context + */ + @Test + public void testConnectX509FailsForUnknownSubjectDN(final VertxTestContext ctx) { + + // GIVEN a registered device + + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + return helper.registry.addTenant(tenantId, tenant); + }).compose(ok -> helper.registry.registerDevice(tenantId, deviceId)) + .compose(ok -> { + final String authId = new X500Principal("CN=4711").getName(X500Principal.RFC2253); + final var credential = X509CertificateCredential.fromAuthId(authId, List.of(new X509CertificateSecret())); + return helper.registry.addCredentials(tenantId, deviceId, Collections.singleton(credential)); + }) + // WHEN the device tries to connect using a client certificate with an unknown subject DN + .compose(ok -> connectToAdapter(deviceCert)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices belonging to a tenant for which the MQTT + * adapter has been disabled. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForDisabledAdapter(final VertxTestContext ctx) { + + final Tenant tenant = new Tenant(); + tenant.addAdapterConfig(new Adapter(Constants.PROTOCOL_ADAPTER_TYPE_MQTT).setEnabled(false)); + + helper.registry + .addDeviceForTenant(tenantId, tenant, deviceId, password) + // WHEN a device that belongs to the tenant tries to connect to the adapter + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices + * using a client certificate which belong to a tenant for which the + * MQTT adapter has been disabled. + * + * @param ctx The test context + */ + @Test + public void testConnectX509FailsForDisabledAdapter(final VertxTestContext ctx) { + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + tenant.addAdapterConfig(new Adapter(Constants.PROTOCOL_ADAPTER_TYPE_MQTT).setEnabled(false)); + return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert); + }) + // WHEN a device that belongs to the tenant tries to connect to the adapter + .compose(ok -> connectToAdapter(deviceCert)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices for which credentials exist but are disabled. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForDisabledCredentials(final VertxTestContext ctx) { + + helper.registry + .addTenant(tenantId) + .compose(ok -> { + return helper.registry.registerDevice(tenantId, deviceId); + }) + .compose(ok -> { + final PasswordCredential secret = Credentials.createPasswordCredential(deviceId, password); + secret.setEnabled(false); + return helper.registry.addCredentials(tenantId, deviceId, List.of(secret)); + }) + // WHEN a device connects using the correct credentials + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices for which credentials exist but the device is + * disabled. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForDisabledDevice(final VertxTestContext ctx) { + + final Tenant tenant = new Tenant(); + + helper.registry + .addTenant(tenantId, tenant) + .compose(ok -> { + final var device = new Device(); + device.setEnabled(false); + return helper.registry.registerDevice(tenantId, deviceId, device); + }) + .compose(ok -> { + final PasswordCredential secret = Credentials.createPasswordCredential(deviceId, password); + return helper.registry.addCredentials(tenantId, deviceId, Collections.singleton(secret)); + }) + // WHEN a device connects using the correct credentials + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices that belong to a disabled tenant. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForDisabledTenant(final VertxTestContext ctx) { + + // Given a disabled tenant for which the MQTT adapter is enabled + final Tenant tenant = new Tenant(); + tenant.setEnabled(false); + + helper.registry + .addDeviceForTenant(tenantId, tenant, deviceId, password) + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices + * using a client certificate that belong to a disabled tenant. + * + * @param ctx The test context + */ + @Test + public void testConnectX509FailsForDisabledTenant(final VertxTestContext ctx) { + + // Given a disabled tenant for which the MQTT adapter is enabled + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + tenant.setEnabled(false); + return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert); + }) + .compose(ok -> connectToAdapter(deviceCert)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter closes the connection to a device when the registration data of the device + * is deleted. + * + * @param ctx The vert.x test context. + */ + @Test + public void testDeviceConnectionIsClosedOnDeviceDeleted(final VertxTestContext ctx) { + testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx, + () -> helper.registry.deregisterDevice(tenantId, deviceId)); + } + + /** + * Verifies that the adapter closes the connection to a device when the registration data of the device + * is disabled. + * + * @param ctx The vert.x test context. + */ + @Test + public void testDeviceConnectionIsClosedOnDeviceDisabled(final VertxTestContext ctx) { + final JsonObject updatedDeviceData = new JsonObject() + .put(RegistrationConstants.FIELD_ENABLED, Boolean.FALSE); + testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx, + () -> helper.registry.updateDevice(tenantId, deviceId, updatedDeviceData)); + } + + /** + * Verifies that the adapter closes the connection to a device when the tenant of the device + * is deleted. + * + * @param ctx The vert.x test context. + */ + @Test + public void testDeviceConnectionIsClosedOnTenantDeleted(final VertxTestContext ctx) { + testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx, + () -> helper.registry.removeTenant(tenantId)); + } + + /** + * Verifies that the adapter closes the connection to a device when the tenant of the device + * is disabled. + * + * @param ctx The vert.x test context. + */ + @Test + public void testDeviceConnectionIsClosedOnTenantDisabled(final VertxTestContext ctx) { + final Tenant updatedTenant = new Tenant().setEnabled(false); + testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx, + () -> helper.registry.updateTenant(tenantId, updatedTenant, HttpURLConnection.HTTP_NO_CONTENT)); + } + + /** + * Verifies that the adapter closes the connection to a device when registration data for all devices of the device + * tenant is deleted. + * + * @param ctx The vert.x test context. + */ + @Test + public void testDeviceConnectionIsClosedOnAllDevicesOfTenantDeleted(final VertxTestContext ctx) { + testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx, + () -> helper.registry.deregisterDevicesOfTenant(tenantId)); + } + + private void testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted( + final VertxTestContext ctx, + final Supplier> deviceRegistryChangeOperation) { + + final Promise connectionClosedPromise = Promise.promise(); + final var disconnectedListener = new MqttClientDisconnectedListener() { + @Override + public void onDisconnected(@NotNull final MqttClientDisconnectedContext context) { + connectionClosedPromise.complete(); + } + }; + + // GIVEN a connected device + helper.registry + .addDeviceForTenant(tenantId, new Tenant(), deviceId, password) + .compose(ok -> connectToAdapter( + IntegrationTestSupport.TLS_VERSION_1_2, + IntegrationTestSupport.getUsername(deviceId, tenantId), + password, + null, + disconnectedListener)) + .compose(conAckMsg -> { + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + // WHEN corresponding device/tenant is removed/disabled + return deviceRegistryChangeOperation.get(); + }) + // THEN the device connection is closed + .compose(ok -> connectionClosedPromise.future()) + .onComplete(ctx.succeedingThenComplete()); + } +} diff --git a/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttTestBase.java b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttTestBase.java new file mode 100644 index 0000000000..95d8620368 --- /dev/null +++ b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttTestBase.java @@ -0,0 +1,285 @@ +/******************************************************************************* + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.tests.mqtt5; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManagerFactory; + +import org.eclipse.hono.tests.IntegrationTestSupport; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.hivemq.client.mqtt.MqttClientSslConfig; +import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier; +import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuth; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.net.PemTrustOptions; +import io.vertx.core.net.SelfSignedCertificate; +import io.vertx.junit5.VertxTestContext; + +/** + * Base class for MQTT adapter integration tests using MQTT 5.0. + * + */ +public abstract class MqttTestBase { + + private static final Vertx vertx = Vertx.vertx(); + + private static TrustManagerFactory trustManagerFactory; + private static HostnameVerifier acceptAllHostnames; + + /** + * A logger to be used by subclasses. + */ + protected final Logger LOGGER = LoggerFactory.getLogger(getClass()); + + /** + * A helper accessing the AMQP 1.0 Messaging Network and + * for managing tenants/devices/credentials. + */ + protected IntegrationTestSupport helper; + /** + * A client for publishing messages to the MQTT protocol adapter. + */ + protected Mqtt5AsyncClient mqttClient; + + /** + * Creates default MQTT client TLS options. + * + * @throws Exception if the trust manager factory cannot be created. + */ + @BeforeAll + public static void init() throws Exception { + + trustManagerFactory = new PemTrustOptions() + .addCertPath(IntegrationTestSupport.TRUST_STORE_PATH) + .getTrustManagerFactory(vertx); + acceptAllHostnames = new HostnameVerifier() { + @Override + public boolean verify(final String hostname, final SSLSession session) { + // disable host name verification + return true; + } + }; + } + + /** + * Create integration test helper. + * + * @param testInfo The JUnit test info. + * @param ctx The vert.x test context. + */ + @BeforeEach + public void createHelper(final TestInfo testInfo, final VertxTestContext ctx) { + LOGGER.info("running {}", testInfo.getDisplayName()); + helper = new IntegrationTestSupport(vertx); + helper.init().onComplete(ctx.succeedingThenComplete()); + } + + /** + * Closes the connection to the AMQP 1.0 Messaging Network. + * + * @param ctx The vert.x test context. + */ + @AfterEach + public void closeConnectionToAmqpMessagingNetwork(final VertxTestContext ctx) { + + helper.disconnect().onComplete(r -> ctx.completeNow()); + } + + /** + * Closes the connection to the MQTT adapter and deletes device registry entries created during the test. + * + * @param ctx The vert.x test context. + */ + @AfterEach + public void closeMqttAdapterConnectionAndCleanupDeviceRegistry(final VertxTestContext ctx) { + + final Promise disconnectHandler = Promise.promise(); + if (mqttClient == null) { + disconnectHandler.complete(); + } else { + Future.fromCompletionStage(mqttClient.disconnect()).onComplete(disconnectHandler); + } + disconnectHandler.future().onComplete(closeAttempt -> { + LOGGER.info("connection to MQTT adapter closed"); + mqttClient = null; + // cleanup device registry - done after the adapter connection is closed because otherwise + // the adapter would close the connection from its end after having received the device deletion notification + helper.deleteObjects(ctx); + }); + } + + /** + * Opens a connection to the MQTT adapter using given credentials. + * + * @param username The username to use for authentication. + * @param password The password to use for authentication. + * @return A future that will be completed with the CONNACK packet received + * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException} + * if the connection could not be established. + */ + protected final Future connectToAdapter( + final String username, + final String password) { + return connectToAdapter(IntegrationTestSupport.TLS_VERSION_1_2, username, password, null, null); + } + + /** + * Opens a connection to the MQTT adapter using given credentials. + * + * @param tlsVersion The TLS protocol version to use for connecting to the adapter. + * @param username The username to use for authentication. + * @param password The password to use for authentication. + * @param mqttClientId MQTT client identifier to use when connecting to the MQTT adapter or + * {@code null}, if an arbitrary identifier should be used. + * @param disconnectedListener A listener to be invoked when the connection to the adapter is lost + * or {@code null}, if no listener should be registered. + * @return A future that will be completed with the CONNACK packet received + * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException} + * if the connection could not be established. + */ + protected final Future connectToAdapter( + final String tlsVersion, + final String username, + final String password, + final MqttClientIdentifier mqttClientId, + final MqttClientDisconnectedListener disconnectedListener) { + + final var sslConfig = MqttClientSslConfig.builder() + .trustManagerFactory(trustManagerFactory) + .hostnameVerifier(acceptAllHostnames) + .protocols(Set.of(tlsVersion)) + .build(); + final var auth = Mqtt5SimpleAuth.builder() + .username(username) + .password(password.getBytes(StandardCharsets.UTF_8)) + .build(); + return connectToAdapter(sslConfig, IntegrationTestSupport.MQTT_HOST, mqttClientId, auth, disconnectedListener); + } + + /** + * Opens a connection to the MQTT adapter using an X.509 client certificate. + * + * @param cert The client certificate to use for authentication. + * @return A future that will be completed with the CONNACK packet received + * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException} + * if the connection could not be established. + * @throws NullPointerException if client certificate is {@code null}. + * @throws IllegalArgumentException if the certificate cannot be used for authenticating to the MQTT adapter. + */ + protected final Future connectToAdapter(final SelfSignedCertificate cert) { + return connectToAdapter(cert, IntegrationTestSupport.MQTT_HOST); + } + + /** + * Opens a connection to the MQTT adapter using an X.509 client certificate. + * + * @param cert The client certificate to use for authentication. + * @param hostname The name of the host to connect to. + * @return A future that will be completed with the CONNACK packet received + * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException} + * if the connection could not be established. + * @throws NullPointerException if any of the parameters are {@code null}. + * @throws IllegalArgumentException if the certificate cannot be used for authenticating to the MQTT adapter. + */ + protected final Future connectToAdapter( + final SelfSignedCertificate cert, + final String hostname) { + + Objects.requireNonNull(cert); + Objects.requireNonNull(hostname); + + final KeyManagerFactory selfSignedKeyManagerFactory; + try { + selfSignedKeyManagerFactory = cert.keyCertOptions().getKeyManagerFactory(vertx); + } catch (final Exception e) { + throw new IllegalArgumentException(e.getMessage(), e.getCause()); + } + final var sslConfig = MqttClientSslConfig.builder() + .trustManagerFactory(trustManagerFactory) + .hostnameVerifier(acceptAllHostnames) + .protocols(Set.of(IntegrationTestSupport.TLS_VERSION_1_2)) + .keyManagerFactory(selfSignedKeyManagerFactory) + .build(); + + return connectToAdapter(sslConfig, hostname, null, null, null); + } + + /** + * Opens a connection to the MQTT adapter using given options. + * + * @param sslConfig The SSL options to use for connecting to the adapter. + * @param hostname The name of the host to connect to. + * @param mqttClientId MQTT client identifier to use when connecting to the MQTT adapter or + * {@code null}, if an arbitrary identifier should be used. + * @param auth The credentials to use for authenticating to the adapter or {@code null}, if + * a client certificate (set in the SSL configuration) should be used. + * @param disconnectedListener A listener to be invoked when the connection to the adapter is lost + * or {@code null}, if no listener should be registered. + * @return A future that will be completed with the CONNACK packet received + * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException} + * if the connection could not be established. + * @throws NullPointerException if any of SSL config or host name are {@code null}. + */ + protected final Future connectToAdapter( + final MqttClientSslConfig sslConfig, + final String hostname, + final MqttClientIdentifier mqttClientId, + final Mqtt5SimpleAuth auth, + final MqttClientDisconnectedListener disconnectedListener) { + + Objects.requireNonNull(sslConfig); + Objects.requireNonNull(hostname); + + final var clientId = Optional.ofNullable(mqttClientId).orElse(MqttClientIdentifier.of(UUID.randomUUID().toString())); + final var builder = Mqtt5Client.builder() + .identifier(clientId) + .sslConfig(sslConfig) + .serverHost(hostname) + .serverPort(IntegrationTestSupport.MQTTS_PORT); + Optional.ofNullable(disconnectedListener).ifPresent(builder::addDisconnectedListener); + mqttClient = builder.buildAsync(); + final var connect = Mqtt5Connect.builder() + .cleanStart(true) + .keepAlive(10) + .simpleAuth(auth) + .build(); + return Future.fromCompletionStage(mqttClient.connect(connect)).onSuccess(conAck -> { + LOGGER.info( + "MQTTS connection to adapter [host: {}, port: {}] established", + hostname, IntegrationTestSupport.MQTTS_PORT); + }); + } +}