mirror of
https://github.com/danieldemus/openhab-core.git
synced 2025-01-25 11:45:49 +01:00
[mqtt] Keep track of retained messages
Closes #1279 Signed-off-by: Jochen Klein <git@jochen.susca.de>
This commit is contained in:
parent
18d0a52d02
commit
cc3f1407bd
@ -17,7 +17,13 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.net.ssl.TrustManager;
|
||||
@ -25,8 +31,7 @@ import javax.net.ssl.TrustManagerFactory;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.io.transport.mqtt.internal.ClientCallback;
|
||||
import org.openhab.core.io.transport.mqtt.internal.TopicSubscribers;
|
||||
import org.openhab.core.io.transport.mqtt.internal.Subscription;
|
||||
import org.openhab.core.io.transport.mqtt.internal.client.Mqtt3AsyncClientWrapper;
|
||||
import org.openhab.core.io.transport.mqtt.internal.client.Mqtt5AsyncClientWrapper;
|
||||
import org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
|
||||
@ -103,7 +108,7 @@ public class MqttBrokerConnection {
|
||||
protected @Nullable MqttAsyncClientWrapper client;
|
||||
protected boolean isConnecting = false;
|
||||
protected final List<MqttConnectionObserver> connectionObservers = new CopyOnWriteArrayList<>();
|
||||
protected final Map<String, TopicSubscribers> subscribers = new ConcurrentHashMap<>();
|
||||
protected final Map<String, Subscription> subscribers = new ConcurrentHashMap<>();
|
||||
|
||||
// Connection timeout handling
|
||||
protected final AtomicReference<@Nullable ScheduledFuture<?>> timeoutFuture = new AtomicReference<>(null);
|
||||
@ -126,6 +131,7 @@ public class MqttBrokerConnection {
|
||||
this.cancelTimeoutFuture = mqttBrokerConnectionImpl::cancelTimeoutFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnected(@Nullable MqttClientConnectedContext context) {
|
||||
cancelTimeoutFuture.run();
|
||||
|
||||
@ -134,8 +140,8 @@ public class MqttBrokerConnection {
|
||||
connection.reconnectStrategy.connectionEstablished();
|
||||
}
|
||||
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
|
||||
connection.subscribers.forEach((topic, subscriberList) -> {
|
||||
futures.add(connection.subscribeRaw(topic));
|
||||
connection.subscribers.forEach((topic, subscription) -> {
|
||||
futures.add(connection.subscribeRaw(topic, subscription));
|
||||
});
|
||||
|
||||
// As soon as all subscriptions are performed, turn the connection future complete.
|
||||
@ -146,6 +152,7 @@ public class MqttBrokerConnection {
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnected(@Nullable MqttClientDisconnectedContext context) {
|
||||
if (context != null) {
|
||||
final Throwable throwable = context.getCause();
|
||||
@ -178,8 +185,6 @@ public class MqttBrokerConnection {
|
||||
}
|
||||
}
|
||||
|
||||
// Client callback object
|
||||
protected ClientCallback clientCallback = new ClientCallback(this, connectionObservers, subscribers);
|
||||
// Connection callback object
|
||||
protected ConnectionCallback connectionCallback;
|
||||
|
||||
@ -542,31 +547,20 @@ public class MqttBrokerConnection {
|
||||
* @return Completes with true if successful. Completes with false if not connected yet. Exceptionally otherwise.
|
||||
*/
|
||||
public CompletableFuture<Boolean> subscribe(String topic, MqttMessageSubscriber subscriber) {
|
||||
CompletableFuture<Boolean> future = new CompletableFuture<>();
|
||||
final Subscription subscription;
|
||||
final boolean needsSubscribe;
|
||||
synchronized (subscribers) {
|
||||
TopicSubscribers subscriberList = subscribers.getOrDefault(topic, new TopicSubscribers(topic));
|
||||
subscribers.put(topic, subscriberList);
|
||||
subscriberList.add(subscriber);
|
||||
subscription = subscribers.computeIfAbsent(topic, t -> new Subscription());
|
||||
|
||||
needsSubscribe = subscription.isEmpty();
|
||||
|
||||
subscription.add(subscriber);
|
||||
}
|
||||
final MqttAsyncClientWrapper client = this.client;
|
||||
if (client == null) {
|
||||
future.completeExceptionally(new Exception("No MQTT client"));
|
||||
return future;
|
||||
|
||||
if (needsSubscribe) {
|
||||
return subscribeRaw(topic, subscription);
|
||||
}
|
||||
if (client.getState().isConnected()) {
|
||||
client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
|
||||
if (t == null) {
|
||||
future.complete(true);
|
||||
} else {
|
||||
logger.warn("Failed subscribing to topic {}", topic, t);
|
||||
future.completeExceptionally(new MqttException(t));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// The subscription will be performed on connecting.
|
||||
future.complete(false);
|
||||
}
|
||||
return future;
|
||||
return CompletableFuture.completedFuture(true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -575,13 +569,14 @@ public class MqttBrokerConnection {
|
||||
* @param topic The topic to subscribe to.
|
||||
* @return Completes with true if successful. Exceptionally otherwise.
|
||||
*/
|
||||
protected CompletableFuture<Boolean> subscribeRaw(String topic) {
|
||||
protected CompletableFuture<Boolean> subscribeRaw(String topic, Subscription subscription) {
|
||||
logger.trace("subscribeRaw message consumer for topic '{}' from broker '{}'", topic, host);
|
||||
CompletableFuture<Boolean> future = new CompletableFuture<>();
|
||||
final MqttAsyncClientWrapper mqttClient = this.client;
|
||||
if (mqttClient != null && mqttClient.getState().isConnected()) {
|
||||
mqttClient.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
|
||||
mqttClient.subscribe(topic, qos, subscription).whenComplete((s, t) -> {
|
||||
if (t == null) {
|
||||
logger.trace("Successfully subscribed to topic {}", topic);
|
||||
future.complete(true);
|
||||
} else {
|
||||
logger.warn("Failed subscribing to topic {}", topic, t);
|
||||
@ -604,30 +599,32 @@ public class MqttBrokerConnection {
|
||||
*/
|
||||
@SuppressWarnings({ "null", "unused" })
|
||||
public CompletableFuture<Boolean> unsubscribe(String topic, MqttMessageSubscriber subscriber) {
|
||||
final boolean needsUnsubscribe;
|
||||
|
||||
synchronized (subscribers) {
|
||||
final @Nullable List<MqttMessageSubscriber> list = subscribers.get(topic);
|
||||
if (list == null) {
|
||||
final @Nullable Subscription subscription = subscribers.get(topic);
|
||||
if (subscription == null) {
|
||||
logger.trace("Tried to unsubscribe {} from topic {}, but subscriber list is empty", subscriber, topic);
|
||||
return CompletableFuture.completedFuture(true);
|
||||
}
|
||||
list.remove(subscriber);
|
||||
if (!list.isEmpty()) {
|
||||
logger.trace("Removed {} from topic subscribers for topic {}, but other subscribers present",
|
||||
subscriber, topic);
|
||||
return CompletableFuture.completedFuture(true);
|
||||
subscription.remove(subscriber);
|
||||
|
||||
if (subscription.isEmpty()) {
|
||||
needsUnsubscribe = true;
|
||||
subscribers.remove(topic);
|
||||
} else {
|
||||
needsUnsubscribe = false;
|
||||
}
|
||||
// Remove from subscriber list
|
||||
subscribers.remove(topic);
|
||||
// No more subscribers to this topic. Unsubscribe topic on the broker
|
||||
}
|
||||
if (needsUnsubscribe) {
|
||||
MqttAsyncClientWrapper mqttClient = this.client;
|
||||
if (mqttClient != null) {
|
||||
logger.trace("Subscriber list is empty after removing {}, unsubscribing topic {} from client",
|
||||
subscriber, topic);
|
||||
return unsubscribeRaw(mqttClient, topic);
|
||||
} else {
|
||||
return CompletableFuture.completedFuture(false);
|
||||
}
|
||||
}
|
||||
return CompletableFuture.completedFuture(true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -772,7 +769,7 @@ public class MqttBrokerConnection {
|
||||
MqttAsyncClientWrapper client = this.client;
|
||||
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
|
||||
if (client != null) {
|
||||
subscribers.forEach((topic, subList) -> {
|
||||
subscribers.forEach((topic, subscription) -> {
|
||||
futures.add(unsubscribeRaw(client, topic));
|
||||
});
|
||||
subscribers.clear();
|
||||
|
@ -1,101 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2020 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.io.transport.mqtt.internal;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
|
||||
import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
|
||||
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
|
||||
import org.openhab.core.io.transport.mqtt.MqttException;
|
||||
import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
|
||||
import org.openhab.core.io.transport.mqtt.reconnect.AbstractReconnectStrategy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.hivemq.client.mqtt.datatypes.MqttTopic;
|
||||
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
|
||||
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
|
||||
|
||||
/**
|
||||
* Processes the MqttCallbacks for the {@link MqttBrokerConnection}.
|
||||
*
|
||||
* @author David Graeff - Initial contribution
|
||||
* @author Jan N. Klug - adjusted to HiveMQ client
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class ClientCallback {
|
||||
final Logger logger = LoggerFactory.getLogger(ClientCallback.class);
|
||||
private final MqttBrokerConnection connection;
|
||||
private final List<MqttConnectionObserver> connectionObservers;
|
||||
private final Map<String, TopicSubscribers> subscribers;
|
||||
|
||||
public ClientCallback(MqttBrokerConnection mqttBrokerConnectionImpl,
|
||||
List<MqttConnectionObserver> connectionObservers, Map<String, TopicSubscribers> subscribers) {
|
||||
this.connection = mqttBrokerConnectionImpl;
|
||||
this.connectionObservers = connectionObservers;
|
||||
this.subscribers = subscribers;
|
||||
}
|
||||
|
||||
public synchronized void connectionLost(@Nullable Throwable exception) {
|
||||
if (exception instanceof MqttException) {
|
||||
MqttException e = (MqttException) exception;
|
||||
logger.info("MQTT connection to '{}' was lost: {} : Cause : {}", connection.getHost(), e.getMessage(),
|
||||
e.getCause().getMessage());
|
||||
} else if (exception != null) {
|
||||
logger.info("MQTT connection to '{}' was lost", connection.getHost(), exception);
|
||||
}
|
||||
|
||||
connectionObservers.forEach(o -> o.connectionStateChanged(MqttConnectionState.DISCONNECTED, exception));
|
||||
AbstractReconnectStrategy reconnectStrategy = connection.getReconnectStrategy();
|
||||
if (reconnectStrategy != null) {
|
||||
reconnectStrategy.lostConnection();
|
||||
}
|
||||
}
|
||||
|
||||
public void messageArrived(Mqtt3Publish message) {
|
||||
messageArrived(message.getTopic(), message.getPayloadAsBytes());
|
||||
}
|
||||
|
||||
public void messageArrived(Mqtt5Publish message) {
|
||||
messageArrived(message.getTopic(), message.getPayloadAsBytes());
|
||||
}
|
||||
|
||||
private void messageArrived(MqttTopic topic, byte[] payload) {
|
||||
String topicString = topic.toString();
|
||||
logger.trace("Received message on topic '{}' : {}", topic, new String(payload));
|
||||
|
||||
List<MqttMessageSubscriber> matchingSubscribers = new ArrayList<>();
|
||||
synchronized (subscribers) {
|
||||
subscribers.values().forEach(subscriberList -> {
|
||||
if (subscriberList.topicMatch(topicString)) {
|
||||
logger.trace("Topic match for '{}' using regex {}", topic, subscriberList.getTopicRegexPattern());
|
||||
subscriberList.forEach(consumer -> matchingSubscribers.add(consumer));
|
||||
} else {
|
||||
logger.trace("No topic match for '{}' using regex {}", topic,
|
||||
subscriberList.getTopicRegexPattern());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
matchingSubscribers.forEach(subscriber -> subscriber.processMessage(topicString, payload));
|
||||
} catch (Exception e) {
|
||||
logger.error("MQTT message received. MqttMessageSubscriber#processMessage() implementation failure", e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2020 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.io.transport.mqtt.internal;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
|
||||
|
||||
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
|
||||
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
|
||||
|
||||
/**
|
||||
* This class keeps track of all the subscribers to a specific topic.
|
||||
* <p>
|
||||
* <b>Retained</b> messages for the topic are stored so they can be replayed to new subscribers.
|
||||
*
|
||||
* @author Jochen Klein - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class Subscription {
|
||||
private final Map<String, byte[]> retainedMessages = new ConcurrentHashMap<>();
|
||||
private final Collection<MqttMessageSubscriber> subscribers = ConcurrentHashMap.newKeySet();
|
||||
|
||||
/**
|
||||
* Add a new subscriber.
|
||||
* <p>
|
||||
* If there are any retained messages, they will be delivered to the subscriber.
|
||||
*
|
||||
* @param subscriber
|
||||
*/
|
||||
public void add(MqttMessageSubscriber subscriber) {
|
||||
if (subscribers.add(subscriber)) {
|
||||
// new subscriber. deliver all known retained messages
|
||||
retainedMessages.entrySet().parallelStream()
|
||||
.forEach(entry -> processMessage(subscriber, entry.getKey(), entry.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a subscriber from the list.
|
||||
*
|
||||
* @param subscriber
|
||||
*/
|
||||
public void remove(MqttMessageSubscriber subscriber) {
|
||||
subscribers.remove(subscriber);
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return subscribers.isEmpty();
|
||||
}
|
||||
|
||||
public void messageArrived(Mqtt3Publish message) {
|
||||
messageArrived(message.getTopic().toString(), message.getPayloadAsBytes(), message.isRetain());
|
||||
}
|
||||
|
||||
public void messageArrived(Mqtt5Publish message) {
|
||||
messageArrived(message.getTopic().toString(), message.getPayloadAsBytes(), message.isRetain());
|
||||
}
|
||||
|
||||
public void messageArrived(String topic, byte[] payload, boolean retain) {
|
||||
if (retain) {
|
||||
if (payload.length > 0) {
|
||||
retainedMessages.put(topic, payload);
|
||||
} else {
|
||||
retainedMessages.remove(topic);
|
||||
}
|
||||
}
|
||||
subscribers.parallelStream().forEach(subscriber -> processMessage(subscriber, topic, payload));
|
||||
}
|
||||
|
||||
private void processMessage(MqttMessageSubscriber subscriber, String topic, byte[] payload) {
|
||||
subscriber.processMessage(topic, payload);
|
||||
}
|
||||
}
|
@ -1,68 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2020 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.io.transport.mqtt.internal;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
|
||||
|
||||
/**
|
||||
* A list of all subscribers for a given topic. This object also stores a regex pattern for the topic, where
|
||||
* the MQTT wildcards and special regex-characters got replaced.
|
||||
*
|
||||
* @author David Graeff - Initial contribution
|
||||
* @author Jan N. Klug - refactored for special cases and performance
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class TopicSubscribers extends ArrayList<MqttMessageSubscriber> {
|
||||
private static final long serialVersionUID = -2969599983479371961L;
|
||||
|
||||
// matches all regex-special characters "(){}[].*$^" (i.e. all except + and \)
|
||||
private static final Pattern REPLACE_SPECIAL_CHAR_PATTERN = Pattern
|
||||
.compile("([\\(\\)\\{\\}\\[\\]\\.\\*\\$\\^]{1})");
|
||||
|
||||
private final Pattern topicRegexPattern;
|
||||
|
||||
public TopicSubscribers(String topic) {
|
||||
// replace special characters
|
||||
String patternString = REPLACE_SPECIAL_CHAR_PATTERN.matcher(topic).replaceAll("\\\\$1");
|
||||
|
||||
// replace single-level-wildcard (+) and multi-level-wildcard (#)
|
||||
patternString = StringUtils.replace(patternString, "+", "[^/]*");
|
||||
patternString = StringUtils.replace(patternString, "#", ".*");
|
||||
|
||||
this.topicRegexPattern = Pattern.compile(patternString);
|
||||
}
|
||||
|
||||
/**
|
||||
* check if topic matches this subscriber list
|
||||
*
|
||||
* @param topic a string representing the topic
|
||||
* @return true if matches
|
||||
*/
|
||||
public boolean topicMatch(String topic) {
|
||||
return topicRegexPattern.matcher(topic).matches();
|
||||
}
|
||||
|
||||
/**
|
||||
* get the regex matching pattern of this subcriber list
|
||||
*
|
||||
* @return the pattern as a string
|
||||
*/
|
||||
public String getTopicRegexPattern() {
|
||||
return topicRegexPattern.pattern();
|
||||
}
|
||||
}
|
@ -22,7 +22,7 @@ import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.ConnectionCallback;
|
||||
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.Protocol;
|
||||
import org.openhab.core.io.transport.mqtt.MqttWillAndTestament;
|
||||
import org.openhab.core.io.transport.mqtt.internal.ClientCallback;
|
||||
import org.openhab.core.io.transport.mqtt.internal.Subscription;
|
||||
|
||||
import com.hivemq.client.mqtt.MqttClientState;
|
||||
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
|
||||
@ -64,14 +64,14 @@ public class Mqtt3AsyncClientWrapper extends MqttAsyncClientWrapper {
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<?> internalSubscribe(String topic, int qos, ClientCallback clientCallback) {
|
||||
public CompletableFuture<?> subscribe(String topic, int qos, Subscription subscription) {
|
||||
Mqtt3Subscribe subscribeMessage = Mqtt3Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos))
|
||||
.build();
|
||||
return client.subscribe(subscribeMessage, clientCallback::messageArrived);
|
||||
return client.subscribe(subscribeMessage, subscription::messageArrived);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<?> internalUnsubscribe(String topic) {
|
||||
public CompletableFuture<?> unsubscribe(String topic) {
|
||||
Mqtt3Unsubscribe unsubscribeMessage = Mqtt3Unsubscribe.builder().topicFilter(topic).build();
|
||||
return client.unsubscribe(unsubscribeMessage);
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.ConnectionCallback;
|
||||
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.Protocol;
|
||||
import org.openhab.core.io.transport.mqtt.MqttWillAndTestament;
|
||||
import org.openhab.core.io.transport.mqtt.internal.ClientCallback;
|
||||
import org.openhab.core.io.transport.mqtt.internal.Subscription;
|
||||
|
||||
import com.hivemq.client.mqtt.MqttClientState;
|
||||
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
|
||||
@ -65,14 +65,14 @@ public class Mqtt5AsyncClientWrapper extends MqttAsyncClientWrapper {
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<?> internalSubscribe(String topic, int qos, ClientCallback clientCallback) {
|
||||
public CompletableFuture<?> subscribe(String topic, int qos, Subscription subscription) {
|
||||
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos))
|
||||
.build();
|
||||
return client.subscribe(subscribeMessage, clientCallback::messageArrived);
|
||||
return client.subscribe(subscribeMessage, subscription::messageArrived);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<?> internalUnsubscribe(String topic) {
|
||||
public CompletableFuture<?> unsubscribe(String topic) {
|
||||
Mqtt5Unsubscribe unsubscribeMessage = Mqtt5Unsubscribe.builder().topicFilter(topic).build();
|
||||
return client.unsubscribe(unsubscribeMessage);
|
||||
}
|
||||
|
@ -12,16 +12,12 @@
|
||||
*/
|
||||
package org.openhab.core.io.transport.mqtt.internal.client;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.io.transport.mqtt.MqttWillAndTestament;
|
||||
import org.openhab.core.io.transport.mqtt.internal.ClientCallback;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.openhab.core.io.transport.mqtt.internal.Subscription;
|
||||
|
||||
import com.hivemq.client.mqtt.MqttClientState;
|
||||
import com.hivemq.client.mqtt.datatypes.MqttQos;
|
||||
@ -34,9 +30,6 @@ import com.hivemq.client.mqtt.datatypes.MqttQos;
|
||||
|
||||
@NonNullByDefault
|
||||
public abstract class MqttAsyncClientWrapper {
|
||||
private final Logger logger = LoggerFactory.getLogger(MqttAsyncClientWrapper.class);
|
||||
private final Set<String> subscriptions = ConcurrentHashMap.newKeySet();
|
||||
|
||||
/**
|
||||
* connect this client
|
||||
*
|
||||
@ -63,42 +56,6 @@ public abstract class MqttAsyncClientWrapper {
|
||||
*/
|
||||
public abstract MqttClientState getState();
|
||||
|
||||
/**
|
||||
* subscribe a client callback to a topic (keeps track of subscriptions)
|
||||
*
|
||||
* @param topic the topic
|
||||
* @param qos QoS for this subscription
|
||||
* @param clientCallback the client callback that need
|
||||
* @return a CompletableFuture (exceptionally on fail)
|
||||
*/
|
||||
public CompletableFuture<Boolean> subscribe(String topic, int qos, ClientCallback clientCallback) {
|
||||
boolean needsSubscription = subscriptions.add(topic);
|
||||
if (needsSubscription) {
|
||||
logger.trace("Trying to subscribe {} to topic {}", this, topic);
|
||||
return internalSubscribe(topic, qos, clientCallback).thenApply(s -> {
|
||||
logger.trace("Successfully subscribed {} to topic {}", this, topic);
|
||||
return true;
|
||||
});
|
||||
} else {
|
||||
logger.trace("{} already subscribed to topic {}", this, topic);
|
||||
return CompletableFuture.completedFuture(true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* unsubscribes from a topic (keeps track of subscriptions)
|
||||
*
|
||||
* @param topic the topic
|
||||
* @return a CompletableFuture (exceptionally on fail)
|
||||
*/
|
||||
public CompletableFuture<Boolean> unsubscribe(String topic) {
|
||||
subscriptions.remove(topic);
|
||||
return internalUnsubscribe(topic).thenApply(s -> {
|
||||
logger.trace("Successfully unsubscribed {} from topic {}", this, topic);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* publish a message
|
||||
*
|
||||
@ -110,9 +67,23 @@ public abstract class MqttAsyncClientWrapper {
|
||||
*/
|
||||
public abstract CompletableFuture<?> publish(String topic, byte[] payload, boolean retain, int qos);
|
||||
|
||||
protected abstract CompletableFuture<?> internalSubscribe(String topic, int qos, ClientCallback clientCallback);
|
||||
/**
|
||||
* subscribe a client callback to a topic
|
||||
*
|
||||
* @param topic the topic
|
||||
* @param qos QoS for this subscription
|
||||
* @param subscription the subscription which keeps track of subscribers and retained messages
|
||||
* @return a CompletableFuture (exceptionally on fail)
|
||||
*/
|
||||
public abstract CompletableFuture<?> subscribe(String topic, int qos, Subscription subscription);
|
||||
|
||||
protected abstract CompletableFuture<?> internalUnsubscribe(String topic);
|
||||
/**
|
||||
* unsubscribes from a topic
|
||||
*
|
||||
* @param topic the topic
|
||||
* @return a CompletableFuture (exceptionally on fail)
|
||||
*/
|
||||
public abstract CompletableFuture<?> unsubscribe(String topic);
|
||||
|
||||
protected MqttQos getMqttQosFromInt(int qos) {
|
||||
switch (qos) {
|
||||
|
@ -20,6 +20,8 @@ import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManager;
|
||||
import javax.net.ssl.X509TrustManager;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.osgi.service.cm.ConfigurationException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -31,6 +33,7 @@ import org.slf4j.LoggerFactory;
|
||||
* @author David Graeff - Initial contribution
|
||||
*/
|
||||
@Deprecated
|
||||
@NonNullByDefault
|
||||
public class AcceptAllCertificatesSSLContext implements SSLContextProvider {
|
||||
private final Logger logger = LoggerFactory.getLogger(AcceptAllCertificatesSSLContext.class);
|
||||
|
||||
@ -41,11 +44,11 @@ public class AcceptAllCertificatesSSLContext implements SSLContextProvider {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkClientTrusted(X509Certificate[] certs, String authType) {
|
||||
public void checkClientTrusted(X509Certificate @Nullable [] certs, @Nullable String authType) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkServerTrusted(X509Certificate[] certs, String authType) {
|
||||
public void checkServerTrusted(X509Certificate @Nullable [] certs, @Nullable String authType) {
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -18,6 +18,8 @@ import java.security.NoSuchAlgorithmException;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.osgi.service.cm.ConfigurationException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -29,11 +31,12 @@ import org.slf4j.LoggerFactory;
|
||||
* @author Jan N. Klug - Initial contribution
|
||||
*/
|
||||
@Deprecated
|
||||
@NonNullByDefault
|
||||
public class CustomSSLContextProvider implements SSLContextProvider {
|
||||
private final Logger logger = LoggerFactory.getLogger(CustomSSLContextProvider.class);
|
||||
private final TrustManagerFactory factory;
|
||||
private final @Nullable TrustManagerFactory factory;
|
||||
|
||||
public CustomSSLContextProvider(TrustManagerFactory factory) {
|
||||
public CustomSSLContextProvider(@Nullable TrustManagerFactory factory) {
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,7 @@ package org.openhab.core.io.transport.mqtt.sslcontext;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
|
||||
import org.osgi.service.cm.ConfigurationException;
|
||||
|
||||
@ -25,6 +26,7 @@ import org.osgi.service.cm.ConfigurationException;
|
||||
* @author David Graeff - Initial contribution
|
||||
*/
|
||||
@Deprecated
|
||||
@NonNullByDefault
|
||||
public interface SSLContextProvider {
|
||||
/**
|
||||
* Return an {@link SSLContext} to be used by secure Mqtt broker connections. Never return null here. If you are not
|
||||
|
@ -15,11 +15,12 @@ package org.openhab.core.io.transport.mqtt;
|
||||
import static org.mockito.ArgumentMatchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNull;
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.io.transport.mqtt.internal.Subscription;
|
||||
import org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
|
||||
|
||||
import com.hivemq.client.mqtt.MqttClientState;
|
||||
@ -48,6 +49,10 @@ public class MqttBrokerConnectionEx extends MqttBrokerConnection {
|
||||
super(host, port, secure, clientId);
|
||||
}
|
||||
|
||||
public Map<String, Subscription> getSubscribers() {
|
||||
return subscribers;
|
||||
}
|
||||
|
||||
void setConnectionCallback(MqttBrokerConnectionEx o) {
|
||||
connectionCallback = spy(new ConnectionCallback(o));
|
||||
}
|
||||
@ -100,7 +105,7 @@ public class MqttBrokerConnectionEx extends MqttBrokerConnection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull MqttConnectionState connectionState() {
|
||||
public MqttConnectionState connectionState() {
|
||||
return connectionStateOverwrite;
|
||||
}
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNull;
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.junit.Test;
|
||||
import org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
|
||||
@ -44,6 +44,7 @@ import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
|
||||
* @author David Graeff - Initial contribution
|
||||
* @author Jan N. Klug - adjusted to HiveMQ client
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class MqttBrokerConnectionTests extends JavaTest {
|
||||
@Test
|
||||
public void subscribeBeforeOnlineThenConnect()
|
||||
@ -62,7 +63,7 @@ public class MqttBrokerConnectionTests extends JavaTest {
|
||||
Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("homie/device123/$name").payload("hello".getBytes())
|
||||
.build();
|
||||
// Test if subscription is active
|
||||
connection.clientCallback.messageArrived(publishMessage);
|
||||
connection.getSubscribers().get("homie/device123/$name").messageArrived(publishMessage);
|
||||
verify(subscriber).processMessage(eq("homie/device123/$name"), eq("hello".getBytes()));
|
||||
}
|
||||
|
||||
@ -88,7 +89,9 @@ public class MqttBrokerConnectionTests extends JavaTest {
|
||||
|
||||
Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("homie/device123/$name").payload("hello".getBytes())
|
||||
.build();
|
||||
connection.clientCallback.messageArrived(publishMessage);
|
||||
connection.getSubscribers().get("homie/device123/+").messageArrived(publishMessage);
|
||||
connection.getSubscribers().get("#").messageArrived(publishMessage);
|
||||
connection.getSubscribers().get("homie/#").messageArrived(publishMessage);
|
||||
|
||||
verify(subscriber).processMessage(eq("homie/device123/$name"), eq("hello".getBytes()));
|
||||
verify(subscriber2).processMessage(eq("homie/device123/$name"), eq("hello".getBytes()));
|
||||
@ -194,7 +197,7 @@ public class MqttBrokerConnectionTests extends JavaTest {
|
||||
|
||||
MqttConnectionObserver o = new MqttConnectionObserver() {
|
||||
@Override
|
||||
public void connectionStateChanged(@NonNull MqttConnectionState state, @Nullable Throwable error) {
|
||||
public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
|
||||
if (state == MqttConnectionState.DISCONNECTED) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import static org.hamcrest.CoreMatchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
|
||||
@ -29,6 +30,7 @@ import org.osgi.service.cm.ConfigurationException;
|
||||
*
|
||||
* @author David Graeff - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class MqttServiceTests {
|
||||
// Tests addBrokersListener/removeBrokersListener
|
||||
@Test
|
||||
|
@ -1,129 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2020 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.io.transport.mqtt.internal;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests for the {@link TopicSubscriber} class
|
||||
*
|
||||
* Checks for several topics that the matcher is as expected and the original topic is really matched
|
||||
*
|
||||
* @author Jan N. Klug - Initial contribution
|
||||
*/
|
||||
public class TopicSubscriberTests {
|
||||
|
||||
@Test
|
||||
public void simpleTopic() {
|
||||
String testTopic = "foo/bar";
|
||||
TopicSubscribers subscribers = new TopicSubscribers(testTopic);
|
||||
|
||||
assertEquals("foo/bar", subscribers.getTopicRegexPattern());
|
||||
assertTrue(subscribers.topicMatch(testTopic));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whiteSpaceTopic() {
|
||||
String testTopic = "foo/b ar";
|
||||
TopicSubscribers subscribers = new TopicSubscribers(testTopic);
|
||||
|
||||
assertEquals("foo/b ar", subscribers.getTopicRegexPattern());
|
||||
assertTrue(subscribers.topicMatch(testTopic));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void specialCharacterStarTopic() {
|
||||
String testTopic = "foo/bar*";
|
||||
TopicSubscribers subscribers = new TopicSubscribers(testTopic);
|
||||
|
||||
assertEquals("foo/bar\\*", subscribers.getTopicRegexPattern());
|
||||
assertTrue(subscribers.topicMatch(testTopic));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void specialCharacterDollarTopic() {
|
||||
String testTopic = "foo/$bar";
|
||||
TopicSubscribers subscribers = new TopicSubscribers(testTopic);
|
||||
|
||||
assertEquals("foo/\\$bar", subscribers.getTopicRegexPattern());
|
||||
assertTrue(subscribers.topicMatch(testTopic));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void specialCharacterCaretTopic() {
|
||||
String testTopic = "foo/b^ar";
|
||||
TopicSubscribers subscribers = new TopicSubscribers(testTopic);
|
||||
|
||||
assertEquals("foo/b\\^ar", subscribers.getTopicRegexPattern());
|
||||
assertTrue(subscribers.topicMatch(testTopic));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void specialCharacterDotTopic() {
|
||||
String testTopic = "foo/ba.r";
|
||||
TopicSubscribers subscribers = new TopicSubscribers(testTopic);
|
||||
|
||||
assertEquals("foo/ba\\.r", subscribers.getTopicRegexPattern());
|
||||
assertTrue(subscribers.topicMatch(testTopic));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void specialCharacterParenthesesTopic() {
|
||||
String testTopic = "foo/b(a)r";
|
||||
TopicSubscribers subscribers = new TopicSubscribers(testTopic);
|
||||
|
||||
assertEquals("foo/b\\(a\\)r", subscribers.getTopicRegexPattern());
|
||||
assertTrue(subscribers.topicMatch(testTopic));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void specialCharacterBracesTopic() {
|
||||
String testTopic = "foo/b{a}r";
|
||||
TopicSubscribers subscribers = new TopicSubscribers(testTopic);
|
||||
|
||||
assertEquals("foo/b\\{a\\}r", subscribers.getTopicRegexPattern());
|
||||
assertTrue(subscribers.topicMatch(testTopic));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void specialCharacterSquareBracketsTopic() {
|
||||
String testTopic = "foo/b[a]r";
|
||||
TopicSubscribers subscribers = new TopicSubscribers(testTopic);
|
||||
|
||||
assertEquals("foo/b\\[a\\]r", subscribers.getTopicRegexPattern());
|
||||
assertTrue(subscribers.topicMatch(testTopic));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void singleLevelWildcardTopic() {
|
||||
String testTopic = "foo/+/bar";
|
||||
TopicSubscribers subscribers = new TopicSubscribers(testTopic);
|
||||
|
||||
assertEquals("foo/[^/]*/bar", subscribers.getTopicRegexPattern());
|
||||
assertTrue(subscribers.topicMatch("foo/silly/bar"));
|
||||
assertFalse(subscribers.topicMatch("foo/silly/two/bar"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void multiLevelWildcardTopic() {
|
||||
String testTopic = "foo/#";
|
||||
TopicSubscribers subscribers = new TopicSubscribers(testTopic);
|
||||
|
||||
assertEquals("foo/.*", subscribers.getTopicRegexPattern());
|
||||
assertTrue(subscribers.topicMatch("foo/bar"));
|
||||
assertTrue(subscribers.topicMatch("foo/silly/bar"));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user