diff --git a/org.eclipse.paho.client.mqttv3/pom.xml b/org.eclipse.paho.client.mqttv3/pom.xml
index e90d622e0..84fb9f69c 100644
--- a/org.eclipse.paho.client.mqttv3/pom.xml
+++ b/org.eclipse.paho.client.mqttv3/pom.xml
@@ -145,6 +145,6 @@
- 1.2.7.timer-ping-sender
+ 1.2.8.mqtt-suback-failure
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttActionListener.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttActionListener.java
index 657c5e5b5..7dba0830c 100644
--- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttActionListener.java
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttActionListener.java
@@ -1,5 +1,7 @@
package org.eclipse.paho.client.mqttv3;
+import java.util.List;
+
/**
* Implementors of this interface will be notified when an asynchronous action completes.
*
@@ -27,4 +29,8 @@ public interface IMqttActionListener {
* @param exception thrown by the action that has failed
*/
void onFailure(IMqttToken asyncActionToken, Throwable exception);
+
+ default void onSubscribeResult(List successTopics, List failedTopics) {
+ return;
+ }
}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java
index bcf2a5349..9f72d8e83 100644
--- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java
@@ -18,6 +18,7 @@
*/
package org.eclipse.paho.client.mqttv3.internal;
+import java.util.Arrays;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Vector;
@@ -33,10 +34,7 @@
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttToken;
import org.eclipse.paho.client.mqttv3.MqttTopic;
-import org.eclipse.paho.client.mqttv3.internal.wire.MqttPubAck;
-import org.eclipse.paho.client.mqttv3.internal.wire.MqttPubComp;
-import org.eclipse.paho.client.mqttv3.internal.wire.MqttPublish;
-import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage;
+import org.eclipse.paho.client.mqttv3.internal.wire.*;
import org.eclipse.paho.client.mqttv3.logging.Logger;
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
@@ -326,7 +324,14 @@ public void fireActionEvent(MqttToken token) {
if (token != null) {
IMqttActionListener asyncCB = token.getActionCallback();
if (asyncCB != null) {
- if (token.getException() == null) {
+ if (
+ token.getResponse() instanceof MqttSuback &&
+ Arrays.stream(((MqttSuback) token.getResponse()).getGrantedQos()).anyMatch(argument -> argument == 128)
+ ) {
+ log.fine(CLASS_NAME, methodName, "716",
+ new Object[] { token.internalTok.getKey() });
+ asyncCB.onFailure(token, new IllegalStateException("the mqtt subscription failed"));
+ } else if (token.getException() == null) {
// @TRACE 716=call onSuccess key={0}
log.fine(CLASS_NAME, methodName, "716",
new Object[] { token.internalTok.getKey() });