From 6e962d9dbcfb472cedd17b192f42bf17432030b3 Mon Sep 17 00:00:00 2001 From: J-N-K Date: Sat, 7 Dec 2019 18:44:35 +0100 Subject: [PATCH] fix multiple MQTT subscriptions (#1259) Signed-off-by: Jan N. Klug --- .../transport/mqtt/MqttBrokerConnection.java | 70 +++++++++------- .../client/Mqtt3AsyncClientWrapper.java | 6 +- .../client/Mqtt5AsyncClientWrapper.java | 6 +- .../client/MqttAsyncClientWrapper.java | 81 ++++++++++++++++++- 4 files changed, 123 insertions(+), 40 deletions(-) diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java index 9d73ef7ff..f9b0e0da8 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java @@ -72,16 +72,14 @@ public class MqttBrokerConnection { * MQTT transport protocols */ public enum Protocol { - TCP, - WEBSOCKETS + TCP, WEBSOCKETS } /** * MQTT version (currently v3 and v5) */ public enum MqttVersion { - V3, - V5 + V3, V5 } // Connection parameters @@ -197,9 +195,9 @@ public class MqttBrokerConnection { * @param port A port or null to select the default port for a secure or insecure connection * @param secure A secure connection * @param clientId Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are - * used for access restriction implementations. - * If none is specified, a default is generated. The client id cannot be longer than 65535 - * characters. + * used for access restriction implementations. + * If none is specified, a default is generated. The client id cannot be longer than 65535 + * characters. * @throws IllegalArgumentException If the client id or port is not valid. */ public MqttBrokerConnection(String host, @Nullable Integer port, boolean secure, @Nullable String clientId) { @@ -214,9 +212,9 @@ public class MqttBrokerConnection { * @param port A port or null to select the default port for a secure or insecure connection * @param secure A secure connection * @param clientId Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are - * used for access restriction implementations. - * If none is specified, a default is generated. The client id cannot be longer than 65535 - * characters. + * used for access restriction implementations. + * If none is specified, a default is generated. The client id cannot be longer than 65535 + * characters. * @throws IllegalArgumentException If the client id or port is not valid. */ @Deprecated @@ -234,9 +232,9 @@ public class MqttBrokerConnection { * @param port A port or null to select the default port for a secure or insecure connection * @param secure A secure connection * @param clientId Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are - * used for access restriction implementations. - * If none is specified, a default is generated. The client id cannot be longer than 65535 - * characters. + * used for access restriction implementations. + * If none is specified, a default is generated. The client id cannot be longer than 65535 + * characters. * @throws IllegalArgumentException If the client id or port is not valid. */ public MqttBrokerConnection(Protocol protocol, MqttVersion mqttVersion, String host, @Nullable Integer port, @@ -427,7 +425,7 @@ public class MqttBrokerConnection { * * @param lastWill The last will object or null. * @param applyImmediately If true, the connection will stopped and started for the new last-will to take effect - * immediately. + * immediately. * @throws MqttException * @throws ConfigurationException */ @@ -483,8 +481,9 @@ public class MqttBrokerConnection { if (isConnecting) { return MqttConnectionState.CONNECTING; } - return (client != null && client.getState().isConnected()) ? MqttConnectionState.CONNECTED - : MqttConnectionState.DISCONNECTED; + return (client != null && client.getState().isConnected()) ? + MqttConnectionState.CONNECTED : + MqttConnectionState.DISCONNECTED; } /** @@ -520,7 +519,7 @@ public class MqttBrokerConnection { * Set the ssl context provider. The default provider is {@see AcceptAllCertifcatesSSLContext}. * * @return The ssl context provider. Should not be null, but the ssl context will in fact - * only be used if a ssl:// url is given. + * only be used if a ssl:// url is given. */ @Deprecated public void setSSLContextProvider(SSLContextProvider sslContextProvider) { @@ -561,14 +560,21 @@ public class MqttBrokerConnection { return future; } if (client.getState().isConnected()) { - client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> { - if (t == null) { - logger.trace("Subscribed {} to topic {}", subscriber, topic); - future.complete(true); - } else { - future.completeExceptionally(new MqttException(t)); - } - }); + if (!client.isSubscribed(topic)) { + // "real" subscription for first subscriber + client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> { + if (t == null) { + logger.trace("Subscribed {} to topic {}", subscriber, topic); + future.complete(true); + } else { + future.completeExceptionally(new MqttException(t)); + } + }); + } else { + // client already subscribed + logger.trace("Subscribed {} to topic {} (follow-up subscription)", subscriber, topic); + future.complete(true); + } } else { // The subscription will be performed on connecting. future.complete(false); @@ -619,7 +625,8 @@ public class MqttBrokerConnection { } list.remove(subscriber); if (!list.isEmpty()) { - logger.trace("Removed {} from topic subscribers for topic {}, but other subscribers present", subscriber, topic); + logger.trace("Removed {} from topic subscribers for topic {}, but other subscribers present", + subscriber, topic); return CompletableFuture.completedFuture(true); } // Remove from subscriber list @@ -627,7 +634,8 @@ public class MqttBrokerConnection { // No more subscribers to this topic. Unsubscribe topic on the broker MqttAsyncClientWrapper mqttClient = this.client; if (mqttClient != null) { - logger.trace("Subscriber list is empty after removing {}, unsubscribing topic {} from connection", subscriber, topic); + logger.trace("Subscriber list is empty after removing {}, unsubscribing topic {} from connection", + subscriber, topic); return unsubscribeRaw(mqttClient, topic); } else { return CompletableFuture.completedFuture(false); @@ -641,7 +649,7 @@ public class MqttBrokerConnection { * @param client The client connection * @param topic The topic to unsubscribe from * @return Completes with true if successful. Completes with false if no broker connection is established. - * Exceptionally otherwise. + * Exceptionally otherwise. */ protected CompletableFuture unsubscribeRaw(MqttAsyncClientWrapper client, String topic) { logger.trace("Unsubscribing message consumer for topic '{}' from broker '{}'", topic, host); @@ -691,7 +699,7 @@ public class MqttBrokerConnection { * do nothing if there is already an active connection. * * @return Returns a future that completes with true if already connected or connecting, - * completes with false if a connection timeout has happened and completes exceptionally otherwise. + * completes with false if a connection timeout has happened and completes exceptionally otherwise. */ public CompletableFuture start() { // We don't want multiple concurrent threads to start a connection @@ -877,7 +885,7 @@ public class MqttBrokerConnection { * @param topic The topic * @param payload The message payload * @return Returns a future that completes with a result of true if the publishing succeeded and completes - * exceptionally on an error or with a result of false if no broker connection is established. + * exceptionally on an error or with a result of false if no broker connection is established. */ public CompletableFuture publish(String topic, byte[] payload) { return publish(topic, payload, getQos(), isRetain()); @@ -891,7 +899,7 @@ public class MqttBrokerConnection { * @param qos The quality of service for this message * @param retain Set to true to retain the message on the broker * @return Returns a future that completes with a result of true if the publishing succeeded and completes - * exceptionally on an error or with a result of false if no broker connection is established. + * exceptionally on an error or with a result of false if no broker connection is established. */ public CompletableFuture publish(String topic, byte[] payload, int qos, boolean retain) { final MqttAsyncClientWrapper client = this.client; diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt3AsyncClientWrapper.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt3AsyncClientWrapper.java index c18677cf8..f9e3b979c 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt3AsyncClientWrapper.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt3AsyncClientWrapper.java @@ -56,7 +56,7 @@ public class Mqtt3AsyncClientWrapper extends MqttAsyncClientWrapper { } client = clientBuilder.buildAsync(); - }; + } @Override public MqttClientState getState() { @@ -64,14 +64,14 @@ public class Mqtt3AsyncClientWrapper extends MqttAsyncClientWrapper { } @Override - public CompletableFuture subscribe(String topic, int qos, ClientCallback clientCallback) { + public CompletableFuture internalSubscribe(String topic, int qos, ClientCallback clientCallback) { Mqtt3Subscribe subscribeMessage = Mqtt3Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos)) .build(); return client.subscribe(subscribeMessage, clientCallback::messageArrived); } @Override - public CompletableFuture unsubscribe(String topic) { + public CompletableFuture internalUnsubscribe(String topic) { Mqtt3Unsubscribe unsubscribeMessage = Mqtt3Unsubscribe.builder().topicFilter(topic).build(); return client.unsubscribe(unsubscribeMessage); } diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt5AsyncClientWrapper.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt5AsyncClientWrapper.java index 9f01d5986..f1e75b141 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt5AsyncClientWrapper.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt5AsyncClientWrapper.java @@ -57,7 +57,7 @@ public class Mqtt5AsyncClientWrapper extends MqttAsyncClientWrapper { } client = clientBuilder.buildAsync(); - }; + } @Override public MqttClientState getState() { @@ -65,14 +65,14 @@ public class Mqtt5AsyncClientWrapper extends MqttAsyncClientWrapper { } @Override - public CompletableFuture subscribe(String topic, int qos, ClientCallback clientCallback) { + public CompletableFuture internalSubscribe(String topic, int qos, ClientCallback clientCallback) { Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos)) .build(); return client.subscribe(subscribeMessage, clientCallback::messageArrived); } @Override - public CompletableFuture unsubscribe(String topic) { + public CompletableFuture internalUnsubscribe(String topic) { Mqtt5Unsubscribe unsubscribeMessage = Mqtt5Unsubscribe.builder().topicFilter(topic).build(); return client.unsubscribe(unsubscribeMessage); } diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java index b754837a3..2a8ec6066 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java @@ -12,6 +12,8 @@ */ package org.eclipse.smarthome.io.transport.mqtt.internal.client; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -21,28 +23,101 @@ import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback; import com.hivemq.client.mqtt.MqttClientState; import com.hivemq.client.mqtt.datatypes.MqttQos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The {@link AbstractMqttAsyncClient} is the base class for async client wrappers + * The {@link MqttAsyncClientWrapper} is the base class for async client wrappers * * @author Jan N. Klug - Initial contribution */ @NonNullByDefault public abstract class MqttAsyncClientWrapper { + private final Logger logger = LoggerFactory.getLogger(MqttAsyncClientWrapper.class); + private final List subscribedTopics = new ArrayList<>(); + + /** + * connect this client + * + * @param lwt last-will and testament (optional) + * @param keepAliveInterval keep-alive interval in ms + * @param username username (optional) + * @param password password (optional) + * @return a CompletableFuture (exceptionally on fail) + */ public abstract CompletableFuture connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval, @Nullable String username, @Nullable String password); + /** + * disconnect this client + * + * @return a CompletableFuture (exceptionally on fail) + */ public abstract CompletableFuture disconnect(); + /** + * get the connection state of this client + * + * @return the client state + */ public abstract MqttClientState getState(); - public abstract CompletableFuture subscribe(String topic, int qos, ClientCallback clientCallback); + /** + * subscribe a client callback to a topic (keeps track of subscriptions) + * + * @param topic the topic + * @param qos QoS for this subscription + * @param clientCallback the client callback that need + * @return a CompletableFuture (exceptionally on fail) + */ + public CompletableFuture subscribe(String topic, int qos, ClientCallback clientCallback) { + subscribedTopics.add(topic); + return internalSubscribe(topic, qos, clientCallback).thenApply(s -> { + logger.trace("successfully subscribed {} to topic {}", this, topic); + return true; + }); + } - public abstract CompletableFuture unsubscribe(String topic); + /** + * subscribe a client callback to a topic (keeps track of subscriptions) + * + * @param topic the topic + * @return a CompletableFuture (exceptionally on fail) + */ + public CompletableFuture unsubscribe(String topic) { + subscribedTopics.remove(topic); + return internalUnsubscribe(topic).thenApply(s -> { + logger.trace("successfully unsubscribed {} from topic {}", this, topic); + return true; + }); + } + /** + * publish a message + * + * @param topic the topic + * @param payload the message as byte array + * @param retain whether this message should be retained + * @param qos the QoS level of this message + * @return a CompletableFuture (exceptionally on fail) + */ public abstract CompletableFuture publish(String topic, byte[] payload, boolean retain, int qos); + /** + * check if this client is subscribed to a topic + * + * @param topic the topic + * @return true if subscribed + */ + public boolean isSubscribed(String topic) { + return subscribedTopics.contains(topic); + } + + protected abstract CompletableFuture internalSubscribe(String topic, int qos, ClientCallback clientCallback); + + protected abstract CompletableFuture internalUnsubscribe(String topic); + protected MqttQos getMqttQosFromInt(int qos) { switch (qos) { case 0: