diff --git a/org.eclipse.paho.client.mqttv3/pom.xml b/org.eclipse.paho.client.mqttv3/pom.xml index e90d622e0..84fb9f69c 100644 --- a/org.eclipse.paho.client.mqttv3/pom.xml +++ b/org.eclipse.paho.client.mqttv3/pom.xml @@ -145,6 +145,6 @@ - 1.2.7.timer-ping-sender + 1.2.8.mqtt-suback-failure diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttActionListener.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttActionListener.java index 657c5e5b5..7dba0830c 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttActionListener.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttActionListener.java @@ -1,5 +1,7 @@ package org.eclipse.paho.client.mqttv3; +import java.util.List; + /** * Implementors of this interface will be notified when an asynchronous action completes. * @@ -27,4 +29,8 @@ public interface IMqttActionListener { * @param exception thrown by the action that has failed */ void onFailure(IMqttToken asyncActionToken, Throwable exception); + + default void onSubscribeResult(List successTopics, List failedTopics) { + return; + } } diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java index bcf2a5349..9f72d8e83 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java @@ -18,6 +18,7 @@ */ package org.eclipse.paho.client.mqttv3.internal; +import java.util.Arrays; import java.util.Enumeration; import java.util.Hashtable; import java.util.Vector; @@ -33,10 +34,7 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttToken; import org.eclipse.paho.client.mqttv3.MqttTopic; -import org.eclipse.paho.client.mqttv3.internal.wire.MqttPubAck; -import org.eclipse.paho.client.mqttv3.internal.wire.MqttPubComp; -import org.eclipse.paho.client.mqttv3.internal.wire.MqttPublish; -import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage; +import org.eclipse.paho.client.mqttv3.internal.wire.*; import org.eclipse.paho.client.mqttv3.logging.Logger; import org.eclipse.paho.client.mqttv3.logging.LoggerFactory; @@ -326,7 +324,14 @@ public void fireActionEvent(MqttToken token) { if (token != null) { IMqttActionListener asyncCB = token.getActionCallback(); if (asyncCB != null) { - if (token.getException() == null) { + if ( + token.getResponse() instanceof MqttSuback && + Arrays.stream(((MqttSuback) token.getResponse()).getGrantedQos()).anyMatch(argument -> argument == 128) + ) { + log.fine(CLASS_NAME, methodName, "716", + new Object[] { token.internalTok.getKey() }); + asyncCB.onFailure(token, new IllegalStateException("the mqtt subscription failed")); + } else if (token.getException() == null) { // @TRACE 716=call onSuccess key={0} log.fine(CLASS_NAME, methodName, "716", new Object[] { token.internalTok.getKey() });