make MQTT subscription tracking thread-safe (#1261)

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
This commit is contained in:
J-N-K 2019-12-08 22:10:23 +01:00 committed by Kai Kreuzer
parent d7972f2ea8
commit cb5838837b
2 changed files with 28 additions and 45 deletions

View File

@ -18,12 +18,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.TrustManager;
@ -107,8 +102,7 @@ public class MqttBrokerConnection {
protected @Nullable MqttAsyncClientWrapper client;
protected boolean isConnecting = false;
protected final List<MqttConnectionObserver> connectionObservers = new CopyOnWriteArrayList<>();
protected final Map<String, TopicSubscribers> subscribers = new HashMap<>();
protected final Map<String, TopicSubscribers> subscribers = new ConcurrentHashMap<>();
// Connection timeout handling
protected final AtomicReference<@Nullable ScheduledFuture<?>> timeoutFuture = new AtomicReference<>(null);
@ -560,21 +554,14 @@ public class MqttBrokerConnection {
return future;
}
if (client.getState().isConnected()) {
if (!client.isSubscribed(topic)) {
// "real" subscription for first subscriber
client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
if (t == null) {
logger.trace("Subscribed {} to topic {}", subscriber, topic);
future.complete(true);
} else {
future.completeExceptionally(new MqttException(t));
}
});
} else {
// client already subscribed
logger.trace("Subscribed {} to topic {} (follow-up subscription)", subscriber, topic);
future.complete(true);
}
client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
if (t == null) {
future.complete(true);
} else {
logger.warn("Failed subscribing to topic {}", topic, t);
future.completeExceptionally(new MqttException(t));
}
});
} else {
// The subscription will be performed on connecting.
future.complete(false);
@ -634,7 +621,7 @@ public class MqttBrokerConnection {
// No more subscribers to this topic. Unsubscribe topic on the broker
MqttAsyncClientWrapper mqttClient = this.client;
if (mqttClient != null) {
logger.trace("Subscriber list is empty after removing {}, unsubscribing topic {} from connection",
logger.trace("Subscriber list is empty after removing {}, unsubscribing topic {} from client",
subscriber, topic);
return unsubscribeRaw(mqttClient, topic);
} else {

View File

@ -12,9 +12,9 @@
*/
package org.eclipse.smarthome.io.transport.mqtt.internal.client;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
@NonNullByDefault
public abstract class MqttAsyncClientWrapper {
private final Logger logger = LoggerFactory.getLogger(MqttAsyncClientWrapper.class);
private final List<String> subscribedTopics = new ArrayList<>();
private final Set<ClientCallback> subscriptions = ConcurrentHashMap.newKeySet();
/**
* connect this client
@ -72,23 +72,29 @@ public abstract class MqttAsyncClientWrapper {
* @return a CompletableFuture (exceptionally on fail)
*/
public CompletableFuture<Boolean> subscribe(String topic, int qos, ClientCallback clientCallback) {
subscribedTopics.add(topic);
return internalSubscribe(topic, qos, clientCallback).thenApply(s -> {
logger.trace("successfully subscribed {} to topic {}", this, topic);
return true;
});
boolean needsSubscription = subscriptions.add(clientCallback);
if (needsSubscription) {
logger.trace("Trying to subscribe {} to topic {}", this, topic);
return internalSubscribe(topic, qos, clientCallback).thenApply(s -> {
logger.trace("Successfully subscribed {} to topic {}", this, topic);
return true;
});
} else {
logger.trace("{} already subscribed to topic {}", this, topic);
return CompletableFuture.completedFuture(true);
}
}
/**
* subscribe a client callback to a topic (keeps track of subscriptions)
* unsubscribes from a topic (keeps track of subscriptions)
*
* @param topic the topic
* @return a CompletableFuture (exceptionally on fail)
*/
public CompletableFuture<Boolean> unsubscribe(String topic) {
subscribedTopics.remove(topic);
subscriptions.remove(topic);
return internalUnsubscribe(topic).thenApply(s -> {
logger.trace("successfully unsubscribed {} from topic {}", this, topic);
logger.trace("Successfully unsubscribed {} from topic {}", this, topic);
return true;
});
}
@ -104,16 +110,6 @@ public abstract class MqttAsyncClientWrapper {
*/
public abstract CompletableFuture<?> publish(String topic, byte[] payload, boolean retain, int qos);
/**
* check if this client is subscribed to a topic
*
* @param topic the topic
* @return true if subscribed
*/
public boolean isSubscribed(String topic) {
return subscribedTopics.contains(topic);
}
protected abstract CompletableFuture<?> internalSubscribe(String topic, int qos, ClientCallback clientCallback);
protected abstract CompletableFuture<?> internalUnsubscribe(String topic);