From cb5838837bb76a8f6450ac0e83755b5e020ebe39 Mon Sep 17 00:00:00 2001 From: J-N-K Date: Sun, 8 Dec 2019 22:10:23 +0100 Subject: [PATCH] make MQTT subscription tracking thread-safe (#1261) Signed-off-by: Jan N. Klug --- .../transport/mqtt/MqttBrokerConnection.java | 35 ++++++----------- .../client/MqttAsyncClientWrapper.java | 38 +++++++++---------- 2 files changed, 28 insertions(+), 45 deletions(-) diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java index f9b0e0da8..92aad903a 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java @@ -18,12 +18,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.TrustManager; @@ -107,8 +102,7 @@ public class MqttBrokerConnection { protected @Nullable MqttAsyncClientWrapper client; protected boolean isConnecting = false; protected final List connectionObservers = new CopyOnWriteArrayList<>(); - - protected final Map subscribers = new HashMap<>(); + protected final Map subscribers = new ConcurrentHashMap<>(); // Connection timeout handling protected final AtomicReference<@Nullable ScheduledFuture> timeoutFuture = new AtomicReference<>(null); @@ -560,21 +554,14 @@ public class MqttBrokerConnection { return future; } if (client.getState().isConnected()) { - if (!client.isSubscribed(topic)) { - // "real" subscription for first subscriber - client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> { - if (t == null) { - logger.trace("Subscribed {} to topic {}", subscriber, topic); - future.complete(true); - } else { - future.completeExceptionally(new MqttException(t)); - } - }); - } else { - // client already subscribed - logger.trace("Subscribed {} to topic {} (follow-up subscription)", subscriber, topic); - future.complete(true); - } + client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> { + if (t == null) { + future.complete(true); + } else { + logger.warn("Failed subscribing to topic {}", topic, t); + future.completeExceptionally(new MqttException(t)); + } + }); } else { // The subscription will be performed on connecting. future.complete(false); @@ -634,7 +621,7 @@ public class MqttBrokerConnection { // No more subscribers to this topic. Unsubscribe topic on the broker MqttAsyncClientWrapper mqttClient = this.client; if (mqttClient != null) { - logger.trace("Subscriber list is empty after removing {}, unsubscribing topic {} from connection", + logger.trace("Subscriber list is empty after removing {}, unsubscribing topic {} from client", subscriber, topic); return unsubscribeRaw(mqttClient, topic); } else { diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java index 2a8ec6066..3729014bd 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java @@ -12,9 +12,9 @@ */ package org.eclipse.smarthome.io.transport.mqtt.internal.client; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory; @NonNullByDefault public abstract class MqttAsyncClientWrapper { private final Logger logger = LoggerFactory.getLogger(MqttAsyncClientWrapper.class); - private final List subscribedTopics = new ArrayList<>(); + private final Set subscriptions = ConcurrentHashMap.newKeySet(); /** * connect this client @@ -72,23 +72,29 @@ public abstract class MqttAsyncClientWrapper { * @return a CompletableFuture (exceptionally on fail) */ public CompletableFuture subscribe(String topic, int qos, ClientCallback clientCallback) { - subscribedTopics.add(topic); - return internalSubscribe(topic, qos, clientCallback).thenApply(s -> { - logger.trace("successfully subscribed {} to topic {}", this, topic); - return true; - }); + boolean needsSubscription = subscriptions.add(clientCallback); + if (needsSubscription) { + logger.trace("Trying to subscribe {} to topic {}", this, topic); + return internalSubscribe(topic, qos, clientCallback).thenApply(s -> { + logger.trace("Successfully subscribed {} to topic {}", this, topic); + return true; + }); + } else { + logger.trace("{} already subscribed to topic {}", this, topic); + return CompletableFuture.completedFuture(true); + } } /** - * subscribe a client callback to a topic (keeps track of subscriptions) + * unsubscribes from a topic (keeps track of subscriptions) * * @param topic the topic * @return a CompletableFuture (exceptionally on fail) */ public CompletableFuture unsubscribe(String topic) { - subscribedTopics.remove(topic); + subscriptions.remove(topic); return internalUnsubscribe(topic).thenApply(s -> { - logger.trace("successfully unsubscribed {} from topic {}", this, topic); + logger.trace("Successfully unsubscribed {} from topic {}", this, topic); return true; }); } @@ -104,16 +110,6 @@ public abstract class MqttAsyncClientWrapper { */ public abstract CompletableFuture publish(String topic, byte[] payload, boolean retain, int qos); - /** - * check if this client is subscribed to a topic - * - * @param topic the topic - * @return true if subscribed - */ - public boolean isSubscribed(String topic) { - return subscribedTopics.contains(topic); - } - protected abstract CompletableFuture internalSubscribe(String topic, int qos, ClientCallback clientCallback); protected abstract CompletableFuture internalUnsubscribe(String topic);