move MQTT client from Paho to HiveMQ (#1125)

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
This commit is contained in:
J-N-K 2019-11-05 07:51:59 +01:00 committed by Kai Kreuzer
parent 4fef0100cb
commit 07d10dba3d
21 changed files with 793 additions and 511 deletions

View File

@ -210,11 +210,11 @@
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<!-- Paho MQTT --> <!-- HiveMQ MQTT Client-->
<dependency> <dependency>
<groupId>org.eclipse.paho</groupId> <groupId>com.hivemq</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> <artifactId>hivemq-mqtt-client</artifactId>
<version>1.2.1</version> <version>1.1.2</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>

View File

@ -450,11 +450,11 @@
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<!-- Paho MQTT --> <!-- HiveMQ MQTT client-->
<dependency> <dependency>
<groupId>org.eclipse.paho</groupId> <groupId>com.hivemq</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> <artifactId>hivemq-mqtt-client</artifactId>
<version>1.2.1</version> <version>1.1.2</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>

View File

@ -12,14 +12,12 @@
*/ */
package org.eclipse.smarthome.io.transport.mqtt; package org.eclipse.smarthome.io.transport.mqtt;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -29,29 +27,32 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils; import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.eclipse.smarthome.config.core.ConfigConstants;
import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback; import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback;
import org.eclipse.smarthome.io.transport.mqtt.internal.MqttActionAdapterCallback;
import org.eclipse.smarthome.io.transport.mqtt.internal.TopicSubscribers; import org.eclipse.smarthome.io.transport.mqtt.internal.TopicSubscribers;
import org.eclipse.smarthome.io.transport.mqtt.internal.client.Mqtt3AsyncClientWrapper;
import org.eclipse.smarthome.io.transport.mqtt.internal.client.Mqtt5AsyncClientWrapper;
import org.eclipse.smarthome.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrategy; import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrategy;
import org.eclipse.smarthome.io.transport.mqtt.reconnect.PeriodicReconnectStrategy; import org.eclipse.smarthome.io.transport.mqtt.reconnect.PeriodicReconnectStrategy;
import org.eclipse.smarthome.io.transport.mqtt.sslcontext.AcceptAllCertificatesSSLContext; import org.eclipse.smarthome.io.transport.mqtt.ssl.CustomTrustManagerFactory;
import org.eclipse.smarthome.io.transport.mqtt.sslcontext.CustomSSLContextProvider;
import org.eclipse.smarthome.io.transport.mqtt.sslcontext.SSLContextProvider; import org.eclipse.smarthome.io.transport.mqtt.sslcontext.SSLContextProvider;
import org.osgi.service.cm.ConfigurationException; import org.osgi.service.cm.ConfigurationException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
/** /**
* An MQTTBrokerConnection represents a single client connection to a MQTT broker. * An MQTTBrokerConnection represents a single client connection to a MQTT broker.
* *
@ -59,7 +60,8 @@ import org.slf4j.LoggerFactory;
* *
* @author Davy Vanherbergen - Initial contribution * @author Davy Vanherbergen - Initial contribution
* @author David Graeff - All operations are async now. More flexible sslContextProvider and reconnectStrategy added. * @author David Graeff - All operations are async now. More flexible sslContextProvider and reconnectStrategy added.
* @author Markus Rathgeb - Added connection state callback * @author Markus Rathgeb - added connection state callback
* @author Jan N. Klug - changed from PAHO to HiveMQ client
*/ */
@NonNullByDefault @NonNullByDefault
public class MqttBrokerConnection { public class MqttBrokerConnection {
@ -73,29 +75,39 @@ public class MqttBrokerConnection {
public enum Protocol { public enum Protocol {
TCP, TCP,
WEBSOCKETS WEBSOCKETS
}; }
/// Connection parameters /**
* MQTT version (currently v3 and v5)
*/
public enum MqttVersion {
V3,
V5
}
// Connection parameters
protected final Protocol protocol; protected final Protocol protocol;
protected final String host; protected final String host;
protected final int port; protected final int port;
protected final boolean secure; protected final boolean secure;
protected final MqttVersion mqttVersion;
private @Nullable TrustManagerFactory trustManagerFactory = InsecureTrustManagerFactory.INSTANCE;
private SSLContextProvider sslContextProvider = new CustomSSLContextProvider(trustManagerFactory);
protected final String clientId; protected final String clientId;
private @Nullable String user; private @Nullable String user;
private @Nullable String password; private @Nullable String password;
/// Configuration variables /// Configuration variables
private int qos = DEFAULT_QOS; private int qos = DEFAULT_QOS;
@Deprecated
private boolean retain = false; private boolean retain = false;
private @Nullable MqttWillAndTestament lastWill; private @Nullable MqttWillAndTestament lastWill;
private @Nullable Path persistencePath;
protected @Nullable AbstractReconnectStrategy reconnectStrategy; protected @Nullable AbstractReconnectStrategy reconnectStrategy;
private SSLContextProvider sslContextProvider = new AcceptAllCertificatesSSLContext();
private int keepAliveInterval = DEFAULT_KEEPALIVE_INTERVAL; private int keepAliveInterval = DEFAULT_KEEPALIVE_INTERVAL;
/// Runtime variables /// Runtime variables
protected @Nullable MqttAsyncClient client; protected @Nullable MqttAsyncClientWrapper client;
protected @Nullable MqttClientPersistence dataStore;
protected boolean isConnecting = false; protected boolean isConnecting = false;
protected final List<MqttConnectionObserver> connectionObservers = new CopyOnWriteArrayList<>(); protected final List<MqttConnectionObserver> connectionObservers = new CopyOnWriteArrayList<>();
@ -107,12 +119,12 @@ public class MqttBrokerConnection {
private int timeout = 1200; /* Connection timeout in milliseconds */ private int timeout = 1200; /* Connection timeout in milliseconds */
/** /**
* Create a IMqttActionListener object for being used as a callback for a connection attempt. * Create a listener object for being used as a callback for a connection attempt.
* The callback will interact with the {@link AbstractReconnectStrategy} as well as inform registered * The callback will interact with the {@link AbstractReconnectStrategy} as well as inform registered
* {@link MqttConnectionObserver}s. * {@link MqttConnectionObserver}s.
*/ */
@NonNullByDefault({}) @NonNullByDefault
public class ConnectionCallback implements IMqttActionListener { public class ConnectionCallback implements MqttClientConnectedListener, MqttClientDisconnectedListener {
private final MqttBrokerConnection connection; private final MqttBrokerConnection connection;
private final Runnable cancelTimeoutFuture; private final Runnable cancelTimeoutFuture;
private CompletableFuture<Boolean> future = new CompletableFuture<>(); private CompletableFuture<Boolean> future = new CompletableFuture<>();
@ -122,8 +134,7 @@ public class MqttBrokerConnection {
this.cancelTimeoutFuture = mqttBrokerConnectionImpl::cancelTimeoutFuture; this.cancelTimeoutFuture = mqttBrokerConnectionImpl::cancelTimeoutFuture;
} }
@Override public void onConnected(@Nullable MqttClientConnectedContext context) {
public void onSuccess(IMqttToken asyncActionToken) {
cancelTimeoutFuture.run(); cancelTimeoutFuture.run();
connection.isConnecting = false; connection.isConnecting = false;
@ -143,15 +154,21 @@ public class MqttBrokerConnection {
}); });
} }
@Override public void onDisconnected(@Nullable MqttClientDisconnectedContext context) {
public void onFailure(@Nullable IMqttToken token, @Nullable Throwable error) { if (context != null) {
cancelTimeoutFuture.run(); final Throwable throwable = context.getCause();
onDisconnected(throwable);
} else {
onDisconnected(new Throwable("unknown disconnect reason"));
}
}
final Throwable throwable = (token != null && token.getException() != null) ? token.getException() : error; public void onDisconnected(Throwable t) {
cancelTimeoutFuture.run();
final MqttConnectionState connectionState = connection.connectionState(); final MqttConnectionState connectionState = connection.connectionState();
future.complete(false); future.complete(false);
connection.connectionObservers.forEach(o -> o.connectionStateChanged(connectionState, throwable)); connection.connectionObservers.forEach(o -> o.connectionStateChanged(connectionState, t));
// If we tried to connect via start(), use the reconnect strategy to try it again // If we tried to connect via start(), use the reconnect strategy to try it again
if (connection.isConnecting) { if (connection.isConnecting) {
@ -168,15 +185,13 @@ public class MqttBrokerConnection {
} }
} }
/** Client callback object */ // Client callback object
protected ClientCallback clientCallback = new ClientCallback(this, connectionObservers, subscribers); protected ClientCallback clientCallback = new ClientCallback(this, connectionObservers, subscribers);
/** Connection callback object */ // Connection callback object
protected ConnectionCallback connectionCallback; protected ConnectionCallback connectionCallback;
/** Action callback object */
protected IMqttActionListener actionCallback = new MqttActionAdapterCallback();
/** /**
* Create a new TCP MQTT client connection to a server with the given host and port. * Create a new TCP MQTT3 client connection to a server with the given host and port.
* *
* @param host A host name or address * @param host A host name or address
* @param port A port or null to select the default port for a secure or insecure connection * @param port A port or null to select the default port for a secure or insecure connection
@ -188,11 +203,11 @@ public class MqttBrokerConnection {
* @throws IllegalArgumentException If the client id or port is not valid. * @throws IllegalArgumentException If the client id or port is not valid.
*/ */
public MqttBrokerConnection(String host, @Nullable Integer port, boolean secure, @Nullable String clientId) { public MqttBrokerConnection(String host, @Nullable Integer port, boolean secure, @Nullable String clientId) {
this(Protocol.TCP, host, port, secure, clientId); this(Protocol.TCP, MqttVersion.V3, host, port, secure, clientId);
} }
/** /**
* Create a new MQTT client connection to a server with the given protocol, host and port. * Create a new MQTT3 client connection to a server with the given protocol, mqtt client version, host and port.
* *
* @param protocol The transport protocol * @param protocol The transport protocol
* @param host A host name or address * @param host A host name or address
@ -204,14 +219,35 @@ public class MqttBrokerConnection {
* characters. * characters.
* @throws IllegalArgumentException If the client id or port is not valid. * @throws IllegalArgumentException If the client id or port is not valid.
*/ */
@Deprecated
public MqttBrokerConnection(Protocol protocol, String host, @Nullable Integer port, boolean secure, public MqttBrokerConnection(Protocol protocol, String host, @Nullable Integer port, boolean secure,
@Nullable String clientId) { @Nullable String clientId) {
this(protocol, MqttVersion.V3, host, port, secure, clientId);
}
/**
* Create a new MQTT client connection to a server with the given protocol, host and port.
*
* @param protocol The transport protocol
* @param mqttVersion The version of the MQTT client (v3 or v5)
* @param host A host name or address
* @param port A port or null to select the default port for a secure or insecure connection
* @param secure A secure connection
* @param clientId Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are
* used for access restriction implementations.
* If none is specified, a default is generated. The client id cannot be longer than 65535
* characters.
* @throws IllegalArgumentException If the client id or port is not valid.
*/
public MqttBrokerConnection(Protocol protocol, MqttVersion mqttVersion, String host, @Nullable Integer port,
boolean secure, @Nullable String clientId) {
this.protocol = protocol; this.protocol = protocol;
this.host = host; this.host = host;
this.secure = secure; this.secure = secure;
this.mqttVersion = mqttVersion;
String newClientID = clientId; String newClientID = clientId;
if (newClientID == null) { if (newClientID == null) {
newClientID = MqttClient.generateClientId(); newClientID = UUID.randomUUID().toString();
} else if (newClientID.length() > 65535) { } else if (newClientID.length() > 65535) {
throw new IllegalArgumentException("Client ID cannot be longer than 65535 characters"); throw new IllegalArgumentException("Client ID cannot be longer than 65535 characters");
} }
@ -229,7 +265,7 @@ public class MqttBrokerConnection {
* state to the MQTT broker changed. * state to the MQTT broker changed.
* *
* The reconnect strategy will not be informed if the initial connection to the broker * The reconnect strategy will not be informed if the initial connection to the broker
* timed out. You need a timeout executor additionally, see {@link #setTimeoutExecutor(Executor)}. * timed out. You need a timeout executor additionally, see {@link #setTimeoutExecutor(ScheduledExecutorService, int)}.
* *
* @param reconnectStrategy The reconnect strategy. May not be null. * @param reconnectStrategy The reconnect strategy. May not be null.
*/ */
@ -257,6 +293,23 @@ public class MqttBrokerConnection {
this.timeout = timeoutInMS; this.timeout = timeoutInMS;
} }
public void setTrustManagers(TrustManager[] trustManagers) {
if (trustManagers.length != 0) {
trustManagerFactory = new CustomTrustManagerFactory(trustManagers);
} else {
trustManagerFactory = null;
}
sslContextProvider = new CustomSSLContextProvider(trustManagerFactory);
}
public TrustManager[] getTrustManagers() {
if (trustManagerFactory != null) {
return trustManagerFactory.getTrustManagers();
} else {
return new TrustManager[] {};
}
}
/** /**
* Get the MQTT broker protocol * Get the MQTT broker protocol
*/ */
@ -264,6 +317,13 @@ public class MqttBrokerConnection {
return protocol; return protocol;
} }
/**
* Get the MQTT version
*/
public MqttVersion getMqttVersion() {
return mqttVersion;
}
/** /**
* Get the MQTT broker host * Get the MQTT broker host
*/ */
@ -327,25 +387,29 @@ public class MqttBrokerConnection {
* @param qos level. * @param qos level.
*/ */
public void setQos(int qos) { public void setQos(int qos) {
if (qos >= 0 && qos <= 2) { if (qos < 0 || qos > 3) {
this.qos = qos; throw new IllegalArgumentException();
} else {
throw new IllegalArgumentException("The quality of service parameter must be >=0 and <=2.");
} }
this.qos = qos;
} }
/** /**
* use retain flags on message publish instead
*
* @return true if newly messages sent to the broker should be retained by the broker. * @return true if newly messages sent to the broker should be retained by the broker.
*/ */
@Deprecated
public boolean isRetain() { public boolean isRetain() {
return retain; return retain;
} }
/** /**
* Set whether newly published messages should be retained by the broker. * Set whether newly published messages should be retained by the broker.
* use retain flags on message publish instead
* *
* @param retain true to retain. * @param retain true to retain.
*/ */
@Deprecated
public void setRetain(boolean retain) { public void setRetain(boolean retain) {
this.retain = retain; this.retain = retain;
} }
@ -398,8 +462,8 @@ public class MqttBrokerConnection {
* *
* @param persistencePath the path that should be used to store persistent data * @param persistencePath the path that should be used to store persistent data
*/ */
@Deprecated
public void setPersistencePath(final @Nullable Path persistencePath) { public void setPersistencePath(final @Nullable Path persistencePath) {
this.persistencePath = persistencePath;
} }
/** /**
@ -418,7 +482,7 @@ public class MqttBrokerConnection {
if (isConnecting) { if (isConnecting) {
return MqttConnectionState.CONNECTING; return MqttConnectionState.CONNECTING;
} }
return (client != null && client.isConnected()) ? MqttConnectionState.CONNECTED return (client != null && client.getState().isConnected()) ? MqttConnectionState.CONNECTED
: MqttConnectionState.DISCONNECTED; : MqttConnectionState.DISCONNECTED;
} }
@ -446,6 +510,7 @@ public class MqttBrokerConnection {
/** /**
* Return the ssl context provider. * Return the ssl context provider.
*/ */
@Deprecated
public SSLContextProvider getSSLContextProvider() { public SSLContextProvider getSSLContextProvider() {
return sslContextProvider; return sslContextProvider;
} }
@ -456,8 +521,10 @@ public class MqttBrokerConnection {
* @return The ssl context provider. Should not be null, but the ssl context will in fact * @return The ssl context provider. Should not be null, but the ssl context will in fact
* only be used if a ssl:// url is given. * only be used if a ssl:// url is given.
*/ */
@Deprecated
public void setSSLContextProvider(SSLContextProvider sslContextProvider) { public void setSSLContextProvider(SSLContextProvider sslContextProvider) {
this.sslContextProvider = sslContextProvider; this.sslContextProvider = sslContextProvider;
trustManagerFactory = new CustomTrustManagerFactory(sslContextProvider);
} }
/** /**
@ -487,17 +554,19 @@ public class MqttBrokerConnection {
subscribers.put(topic, subscriberList); subscribers.put(topic, subscriberList);
subscriberList.add(subscriber); subscriberList.add(subscriber);
} }
final MqttAsyncClient client = this.client; final MqttAsyncClientWrapper client = this.client;
if (client == null) { if (client == null) {
future.completeExceptionally(new Exception("No MQTT client")); future.completeExceptionally(new Exception("No MQTT client"));
return future; return future;
} }
if (client.isConnected()) { if (client.getState().isConnected()) {
try { client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
client.subscribe(topic, qos, future, actionCallback); if (t == null) {
} catch (org.eclipse.paho.client.mqttv3.MqttException e) { future.complete(true);
future.completeExceptionally(e); } else {
} future.completeExceptionally(new MqttException(t));
}
});
} else { } else {
// The subscription will be performed on connecting. // The subscription will be performed on connecting.
future.complete(false); future.complete(false);
@ -514,16 +583,18 @@ public class MqttBrokerConnection {
protected CompletableFuture<Boolean> subscribeRaw(String topic) { protected CompletableFuture<Boolean> subscribeRaw(String topic) {
logger.trace("subscribeRaw message consumer for topic '{}' from broker '{}'", topic, host); logger.trace("subscribeRaw message consumer for topic '{}' from broker '{}'", topic, host);
CompletableFuture<Boolean> future = new CompletableFuture<>(); CompletableFuture<Boolean> future = new CompletableFuture<>();
try { final MqttAsyncClientWrapper mqttClient = this.client;
MqttAsyncClient client = this.client; if (mqttClient != null && mqttClient.getState().isConnected()) {
if (client != null && client.isConnected()) { mqttClient.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
client.subscribe(topic, qos, future, actionCallback); if (t == null) {
} else { future.complete(true);
future.complete(false); } else {
} logger.warn("Failed subscribing to topic {}", topic, t);
} catch (org.eclipse.paho.client.mqttv3.MqttException e) { future.completeExceptionally(new MqttException(t));
logger.info("Error subscribing to topic {}", topic, e); }
future.completeExceptionally(e); });
} else {
future.complete(false);
} }
return future; return future;
} }
@ -550,9 +621,9 @@ public class MqttBrokerConnection {
// Remove from subscriber list // Remove from subscriber list
subscribers.remove(topic); subscribers.remove(topic);
// No more subscribers to this topic. Unsubscribe topic on the broker // No more subscribers to this topic. Unsubscribe topic on the broker
MqttAsyncClient client = this.client; MqttAsyncClientWrapper mqttClient = this.client;
if (client != null) { if (mqttClient != null) {
return unsubscribeRaw(client, topic); return unsubscribeRaw(mqttClient, topic);
} else { } else {
return CompletableFuture.completedFuture(false); return CompletableFuture.completedFuture(false);
} }
@ -567,18 +638,19 @@ public class MqttBrokerConnection {
* @return Completes with true if successful. Completes with false if no broker connection is established. * @return Completes with true if successful. Completes with false if no broker connection is established.
* Exceptionally otherwise. * Exceptionally otherwise.
*/ */
protected CompletableFuture<Boolean> unsubscribeRaw(MqttAsyncClient client, String topic) { protected CompletableFuture<Boolean> unsubscribeRaw(MqttAsyncClientWrapper client, String topic) {
logger.trace("Unsubscribing message consumer for topic '{}' from broker '{}'", topic, host); logger.trace("Unsubscribing message consumer for topic '{}' from broker '{}'", topic, host);
CompletableFuture<Boolean> future = new CompletableFuture<>(); CompletableFuture<Boolean> future = new CompletableFuture<>();
try { if (client.getState().isConnected()) {
if (client.isConnected()) { client.unsubscribe(topic).whenComplete((s, t) -> {
client.unsubscribe(topic, future, actionCallback); if (t == null) {
} else { future.complete(true);
future.complete(false); } else {
} future.completeExceptionally(new MqttException(t));
} catch (org.eclipse.paho.client.mqttv3.MqttException e) { }
logger.info("Error unsubscribing topic from broker", e); });
future.completeExceptionally(e); } else {
return CompletableFuture.completedFuture(false);
} }
return future; return future;
} }
@ -608,33 +680,6 @@ public class MqttBrokerConnection {
return !connectionObservers.isEmpty(); return !connectionObservers.isEmpty();
} }
/**
* Create a MqttConnectOptions object using the fields of this MqttBrokerConnection instance.
* Package local, for testing.
*/
MqttConnectOptions createMqttOptions() throws ConfigurationException {
MqttConnectOptions options = new MqttConnectOptions();
if (!StringUtils.isBlank(user)) {
options.setUserName(user);
}
if (!StringUtils.isBlank(password) && password != null) {
options.setPassword(password.toCharArray());
}
if (secure) {
options.setSocketFactory(sslContextProvider.getContext().getSocketFactory());
}
if (lastWill != null) {
MqttWillAndTestament lastWill = this.lastWill; // Make eclipse happy
options.setWill(lastWill.getTopic(), lastWill.getPayload(), lastWill.getQos(), lastWill.isRetain());
}
options.setKeepAliveInterval(keepAliveInterval);
options.setHttpsHostnameVerificationEnabled(false);
return options;
}
/** /**
* This will establish a connection to the MQTT broker and if successful, notify all * This will establish a connection to the MQTT broker and if successful, notify all
* publishers and subscribers that the connection has become active. This method will * publishers and subscribers that the connection has become active. This method will
@ -661,76 +706,29 @@ public class MqttBrokerConnection {
} }
// Close client if there is still one existing // Close client if there is still one existing
if (client != null) {
try { if (this.client != null) {
client.close(); this.client.disconnect();
} catch (org.eclipse.paho.client.mqttv3.MqttException ignore) { this.client = null;
}
client = null;
} }
CompletableFuture<Boolean> future = connectionCallback.createFuture(); CompletableFuture<Boolean> future = connectionCallback.createFuture();
StringBuilder serverURI = new StringBuilder();
switch (protocol) {
case TCP:
serverURI.append(secure ? "ssl://" : "tcp://");
break;
case WEBSOCKETS:
serverURI.append(secure ? "wss://" : "ws://");
break;
default:
future.completeExceptionally(new ConfigurationException("protocol", "Protocol unknown"));
return future;
}
serverURI.append(host);
serverURI.append(":");
serverURI.append(port);
// Storage
Path persistencePath = this.persistencePath;
if (persistencePath == null) {
persistencePath = Paths.get(ConfigConstants.getUserDataFolder()).resolve("mqtt").resolve(host);
}
try {
persistencePath = Files.createDirectories(persistencePath);
} catch (IOException e) {
future.completeExceptionally(new MqttException(e));
return future;
}
MqttDefaultFilePersistence localDataStore = new MqttDefaultFilePersistence(persistencePath.toString());
// Create the client // Create the client
MqttAsyncClient localClient; MqttAsyncClientWrapper client = createClient();
try { this.client = client;
localClient = createClient(serverURI.toString(), clientId, localDataStore);
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
future.completeExceptionally(new MqttException(e));
return future;
}
// Assign to object // connect
this.client = localClient; client.connect(lastWill, keepAliveInterval, user, password);
this.dataStore = localDataStore;
// Connect logger.info("Starting MQTT broker connection to '{}' with clientid {}", host, getClientId());
localClient.setCallback(clientCallback);
try {
MqttConnectOptions mqttConnectOptions = createMqttOptions();
mqttConnectOptions.setMaxInflight(16384); // 1/4 of available message ids
localClient.connect(mqttConnectOptions, null, connectionCallback);
logger.info("Starting MQTT broker connection to '{}' with clientid {} and file store '{}'", host,
getClientId(), persistencePath);
} catch (org.eclipse.paho.client.mqttv3.MqttException | ConfigurationException e) {
future.completeExceptionally(new MqttException(e));
return future;
}
// Connect timeout // Connect timeout
ScheduledExecutorService executor = timeoutExecutor; ScheduledExecutorService executor = timeoutExecutor;
if (executor != null) { if (executor != null) {
final ScheduledFuture<?> timeoutFuture = this.timeoutFuture.getAndSet(executor.schedule( final ScheduledFuture<?> timeoutFuture = this.timeoutFuture.getAndSet(executor.schedule(
() -> connectionCallback.onFailure(null, new TimeoutException()), timeout, TimeUnit.MILLISECONDS)); () -> connectionCallback.onDisconnected(new TimeoutException("connect timed out")), timeout,
TimeUnit.MILLISECONDS));
if (timeoutFuture != null) { if (timeoutFuture != null) {
timeoutFuture.cancel(false); timeoutFuture.cancel(false);
} }
@ -738,18 +736,14 @@ public class MqttBrokerConnection {
return future; return future;
} }
/** protected MqttAsyncClientWrapper createClient() {
* Encapsulates the creation of the paho MqttAsyncClient if (mqttVersion == MqttVersion.V3) {
* return new Mqtt3AsyncClientWrapper(host, port, clientId, protocol, secure, connectionCallback,
* @param serverURI A paho uri like ssl://host:port, tcp://host:port, ws[s]://host:port trustManagerFactory);
* @param clientId the mqtt client ID } else {
* @param dataStore The datastore to save qos!=0 messages until they are delivered. return new Mqtt5AsyncClientWrapper(host, port, clientId, protocol, secure, connectionCallback,
* @return Returns a valid MqttAsyncClient trustManagerFactory);
* @throws org.eclipse.paho.client.mqttv3.MqttException }
*/
protected MqttAsyncClient createClient(String serverURI, String clientId, MqttClientPersistence dataStore)
throws org.eclipse.paho.client.mqttv3.MqttException {
return new MqttAsyncClient(serverURI, clientId, dataStore);
} }
/** /**
@ -760,20 +754,11 @@ public class MqttBrokerConnection {
* @return Returns the value of the parameter v. * @return Returns the value of the parameter v.
*/ */
protected boolean finalizeStopAfterDisconnect(boolean v) { protected boolean finalizeStopAfterDisconnect(boolean v) {
if (client != null) { final MqttAsyncClientWrapper client = this.client;
try { if (client != null && connectionState() != MqttConnectionState.DISCONNECTED) {
client.close(); client.disconnect();
} catch (Exception ignore) {
}
}
client = null;
if (dataStore != null) {
try {
dataStore.close();
} catch (Exception ignore) {
}
dataStore = null;
} }
this.client = null;
connectionObservers.forEach(o -> o.connectionStateChanged(MqttConnectionState.DISCONNECTED, null)); connectionObservers.forEach(o -> o.connectionStateChanged(MqttConnectionState.DISCONNECTED, null));
return v; return v;
} }
@ -784,7 +769,7 @@ public class MqttBrokerConnection {
* @return Returns a future that completes as soon as all subscriptions have been canceled. * @return Returns a future that completes as soon as all subscriptions have been canceled.
*/ */
public CompletableFuture<Void> unsubscribeAll() { public CompletableFuture<Void> unsubscribeAll() {
MqttAsyncClient client = this.client; MqttAsyncClientWrapper client = this.client;
List<CompletableFuture<Boolean>> futures = new ArrayList<>(); List<CompletableFuture<Boolean>> futures = new ArrayList<>();
if (client != null) { if (client != null) {
subscribers.forEach((topic, subList) -> { subscribers.forEach((topic, subList) -> {
@ -804,7 +789,7 @@ public class MqttBrokerConnection {
* @return Returns a future that completes as soon as the disconnect process has finished. * @return Returns a future that completes as soon as the disconnect process has finished.
*/ */
public CompletableFuture<Boolean> stop() { public CompletableFuture<Boolean> stop() {
MqttAsyncClient client = this.client; MqttAsyncClientWrapper client = this.client;
if (client == null) { if (client == null) {
return CompletableFuture.completedFuture(true); return CompletableFuture.completedFuture(true);
} }
@ -825,19 +810,15 @@ public class MqttBrokerConnection {
CompletableFuture<Boolean> future = new CompletableFuture<>(); CompletableFuture<Boolean> future = new CompletableFuture<>();
// Close connection // Close connection
if (client.isConnected()) { if (client.getState().isConnected()) {
// We need to thread change here. Because paho does not allow to disconnect within a callback method unsubscribeAll().thenRun(() -> {
unsubscribeAll().thenRunAsync(() -> { client.disconnect().whenComplete((m, t) -> {
try { if (t == null) {
client.disconnect(100).waitForCompletion(100); future.complete(true);
if (client.isConnected()) { } else {
client.disconnectForcibly(); future.complete(false);
} }
future.complete(true); });
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
logger.debug("Error while closing connection to broker", e);
future.complete(false);
}
}); });
} else { } else {
future.complete(true); future.complete(true);
@ -846,6 +827,18 @@ public class MqttBrokerConnection {
return future.thenApply(this::finalizeStopAfterDisconnect); return future.thenApply(this::finalizeStopAfterDisconnect);
} }
/**
* Publish a message to the broker.
*
* @param topic The topic
* @param payload The message payload
* @param listener A listener to be notified of success or failure of the delivery.
*/
@Deprecated
public void publish(String topic, byte[] payload, MqttActionCallback listener) {
publish(topic, payload, getQos(), isRetain(), listener);
}
/** /**
* Publish a message to the broker with the given QoS and retained flag. * Publish a message to the broker with the given QoS and retained flag.
* *
@ -855,30 +848,22 @@ public class MqttBrokerConnection {
* @param retain Set to true to retain the message on the broker * @param retain Set to true to retain the message on the broker
* @param listener A listener to be notified of success or failure of the delivery. * @param listener A listener to be notified of success or failure of the delivery.
*/ */
@Deprecated
public void publish(String topic, byte[] payload, int qos, boolean retain, MqttActionCallback listener) { public void publish(String topic, byte[] payload, int qos, boolean retain, MqttActionCallback listener) {
MqttAsyncClient localClient = client; final MqttAsyncClientWrapper client = this.client;
if (localClient == null) { if (client == null) {
listener.onFailure(topic, new MqttException(0)); listener.onFailure(topic, new MqttException(new Throwable()));
return; return;
} }
try {
IMqttDeliveryToken deliveryToken = localClient.publish(topic, payload, qos, retain, listener,
actionCallback);
logger.debug("Publishing message {} to topic '{}'", deliveryToken.getMessageId(), topic);
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
listener.onFailure(topic, new MqttException(e));
}
}
/** client.publish(topic, payload, retain, qos).whenComplete((m, t) -> {
* Publish a message to the broker. if (t != null) {
* listener.onFailure(topic, new MqttException(t));
* @param topic The topic } else {
* @param payload The message payload listener.onSuccess(topic);
* @param listener A listener to be notified of success or failure of the delivery. }
*/ });
public void publish(String topic, byte[] payload, MqttActionCallback listener) { logger.debug("Publishing message to topic '{}'", topic);
publish(topic, payload, qos, retain, listener);
} }
/** /**
@ -890,7 +875,7 @@ public class MqttBrokerConnection {
* exceptionally on an error or with a result of false if no broker connection is established. * exceptionally on an error or with a result of false if no broker connection is established.
*/ */
public CompletableFuture<Boolean> publish(String topic, byte[] payload) { public CompletableFuture<Boolean> publish(String topic, byte[] payload) {
return publish(topic, payload, qos, retain); return publish(topic, payload, getQos(), isRetain());
} }
/** /**
@ -900,23 +885,26 @@ public class MqttBrokerConnection {
* @param payload The message payload * @param payload The message payload
* @param qos The quality of service for this message * @param qos The quality of service for this message
* @param retain Set to true to retain the message on the broker * @param retain Set to true to retain the message on the broker
* @param listener An optional listener to be notified of success or failure of the delivery.
* @return Returns a future that completes with a result of true if the publishing succeeded and completes * @return Returns a future that completes with a result of true if the publishing succeeded and completes
* exceptionally on an error or with a result of false if no broker connection is established. * exceptionally on an error or with a result of false if no broker connection is established.
*/ */
public CompletableFuture<Boolean> publish(String topic, byte[] payload, int qos, boolean retain) { public CompletableFuture<Boolean> publish(String topic, byte[] payload, int qos, boolean retain) {
MqttAsyncClient client = this.client; final MqttAsyncClientWrapper client = this.client;
if (client == null) { if (client == null) {
return CompletableFuture.completedFuture(false); return CompletableFuture.completedFuture(false);
} }
// publish message asynchronously // publish message asynchronously
CompletableFuture<Boolean> f = new CompletableFuture<>(); CompletableFuture<Boolean> future = new CompletableFuture<>();
try { client.publish(topic, payload, retain, qos).whenComplete((m, t) -> {
client.publish(topic, payload, qos, retain, f, actionCallback); if (t == null) {
} catch (org.eclipse.paho.client.mqttv3.MqttException e) { future.complete(true);
f.completeExceptionally(new MqttException(e)); } else {
} future.completeExceptionally(new MqttException(t));
return f; }
});
return future;
} }
/** /**
@ -929,4 +917,5 @@ public class MqttBrokerConnection {
timeoutFuture.cancel(false); timeoutFuture.cancel(false);
} }
} }
} }

View File

@ -34,6 +34,7 @@ public class MqttBrokerConnectionConfig {
public @Nullable String clientID; public @Nullable String clientID;
// MQTT parameters // MQTT parameters
public Integer qos = MqttBrokerConnection.DEFAULT_QOS; public Integer qos = MqttBrokerConnection.DEFAULT_QOS;
@Deprecated
public Boolean retainMessages = false; public Boolean retainMessages = false;
/** Keepalive in seconds */ /** Keepalive in seconds */
public @Nullable Integer keepAlive; public @Nullable Integer keepAlive;

View File

@ -12,12 +12,15 @@
*/ */
package org.eclipse.smarthome.io.transport.mqtt; package org.eclipse.smarthome.io.transport.mqtt;
import org.eclipse.jdt.annotation.NonNullByDefault;
/** /**
* Thrown if an error occurs communicating with the server. The exception contains a reason code. The semantic of the * Thrown if an error occurs communicating with the server. The exception contains a reason code. The semantic of the
* reason code depends on the underlying implementation. * reason code depends on the underlying implementation.
* *
* @author David Graeff - Initial contribution * @author David Graeff - Initial contribution
*/ */
@NonNullByDefault
public class MqttException extends Exception { public class MqttException extends Exception {
private static final long serialVersionUID = 301L; private static final long serialVersionUID = 301L;
private int reasonCode; private int reasonCode;
@ -29,10 +32,22 @@ public class MqttException extends Exception {
* *
* @param reasonCode the reason code for the exception. * @param reasonCode the reason code for the exception.
*/ */
@Deprecated
public MqttException(int reasonCode) { public MqttException(int reasonCode) {
this.cause = new Exception();
this.reasonCode = reasonCode; this.reasonCode = reasonCode;
} }
/**
* Constructs a new <code>MqttException</code> with the specified reason
*
* @param reason the reason for the exception.
*/
public MqttException(String reason) {
this.cause = new Exception("reason");
this.reasonCode = Integer.MIN_VALUE;
}
/** /**
* Constructs a new <code>MqttException</code> with the specified * Constructs a new <code>MqttException</code> with the specified
* <code>Throwable</code> as the underlying reason. * <code>Throwable</code> as the underlying reason.
@ -40,12 +55,7 @@ public class MqttException extends Exception {
* @param cause the underlying cause of the exception. * @param cause the underlying cause of the exception.
*/ */
public MqttException(Throwable cause) { public MqttException(Throwable cause) {
if (cause instanceof org.eclipse.paho.client.mqttv3.MqttException) { this.reasonCode = Integer.MIN_VALUE;
org.eclipse.paho.client.mqttv3.MqttException internalException = (org.eclipse.paho.client.mqttv3.MqttException) cause;
this.reasonCode = internalException.getReasonCode();
} else {
this.reasonCode = org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CLIENT_EXCEPTION;
}
this.cause = cause; this.cause = cause;
} }
@ -56,6 +66,7 @@ public class MqttException extends Exception {
* @param reason the reason code for the exception. * @param reason the reason code for the exception.
* @param cause the underlying cause of the exception. * @param cause the underlying cause of the exception.
*/ */
@Deprecated
public MqttException(int reason, Throwable cause) { public MqttException(int reason, Throwable cause) {
this.reasonCode = reason; this.reasonCode = reason;
this.cause = cause; this.cause = cause;
@ -66,6 +77,7 @@ public class MqttException extends Exception {
* *
* @return the code representing the reason for this exception. * @return the code representing the reason for this exception.
*/ */
@Deprecated
public int getReasonCode() { public int getReasonCode() {
return reasonCode; return reasonCode;
} }
@ -86,11 +98,7 @@ public class MqttException extends Exception {
*/ */
@Override @Override
public String getMessage() { public String getMessage() {
if (cause != null) { return cause.getMessage();
return cause.getMessage();
}
return "MqttException with reason " + String.valueOf(reasonCode);
} }
/** /**
@ -98,14 +106,6 @@ public class MqttException extends Exception {
*/ */
@Override @Override
public String toString() { public String toString() {
if (cause instanceof org.eclipse.paho.client.mqttv3.MqttException) { return cause.toString();
org.eclipse.paho.client.mqttv3.MqttException internalException = (org.eclipse.paho.client.mqttv3.MqttException) cause;
return internalException.toString();
}
String result = getMessage() + " (" + reasonCode + ")";
if (cause != null) {
result = result + " - " + cause.toString();
}
return result;
} }
} }

View File

@ -16,10 +16,9 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.hivemq.client.mqtt.datatypes.MqttTopic;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection; import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection;
import org.eclipse.smarthome.io.transport.mqtt.MqttConnectionObserver; import org.eclipse.smarthome.io.transport.mqtt.MqttConnectionObserver;
import org.eclipse.smarthome.io.transport.mqtt.MqttConnectionState; import org.eclipse.smarthome.io.transport.mqtt.MqttConnectionState;
@ -29,12 +28,17 @@ import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrate
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
/** /**
* Processes the paho MqttCallbacks for the {@link MqttBrokerConnection}. * Processes the MqttCallbacks for the {@link MqttBrokerConnection}.
* *
* @author David Graeff - Initial contribution * @author David Graeff - Initial contribution
* @author Jan N. Klug - adjusted to HiveMQ client
*/ */
public class ClientCallback implements MqttCallback { @NonNullByDefault
public class ClientCallback {
final Logger logger = LoggerFactory.getLogger(ClientCallback.class); final Logger logger = LoggerFactory.getLogger(ClientCallback.class);
private final MqttBrokerConnection connection; private final MqttBrokerConnection connection;
private final List<MqttConnectionObserver> connectionObservers; private final List<MqttConnectionObserver> connectionObservers;
@ -47,12 +51,11 @@ public class ClientCallback implements MqttCallback {
this.subscribers = subscribers; this.subscribers = subscribers;
} }
@Override
public synchronized void connectionLost(@Nullable Throwable exception) { public synchronized void connectionLost(@Nullable Throwable exception) {
if (exception instanceof MqttException) { if (exception instanceof MqttException) {
MqttException e = (MqttException) exception; MqttException e = (MqttException) exception;
logger.info("MQTT connection to '{}' was lost: {} : ReasonCode {} : Cause : {}", connection.getHost(), logger.info("MQTT connection to '{}' was lost: {} : Cause : {}", connection.getHost(), e.getMessage(),
e.getMessage(), e.getReasonCode(), (e.getCause() == null ? "Unknown" : e.getCause().getMessage())); e.getCause().getMessage());
} else if (exception != null) { } else if (exception != null) {
logger.info("MQTT connection to '{}' was lost", connection.getHost(), exception); logger.info("MQTT connection to '{}' was lost", connection.getHost(), exception);
} }
@ -64,30 +67,33 @@ public class ClientCallback implements MqttCallback {
} }
} }
@Override public void messageArrived(Mqtt3Publish message) {
public void deliveryComplete(IMqttDeliveryToken token) { messageArrived(message.getTopic(), message.getPayloadAsBytes());
logger.trace("Message with id {} delivered.", token.getMessageId());
} }
@Override public void messageArrived(Mqtt5Publish message) {
public void messageArrived(String topic, MqttMessage message) { messageArrived(message.getTopic(), message.getPayloadAsBytes());
byte[] payload = message.getPayload(); }
private void messageArrived(MqttTopic topic, byte[] payload) {
String topicString = topic.toString();
logger.trace("Received message on topic '{}' : {}", topic, new String(payload)); logger.trace("Received message on topic '{}' : {}", topic, new String(payload));
List<MqttMessageSubscriber> matches = new ArrayList<>();
List<MqttMessageSubscriber> matchingSubscribers = new ArrayList<>();
synchronized (subscribers) { synchronized (subscribers) {
subscribers.values().forEach(subscriberList -> { subscribers.values().forEach(subscriberList -> {
if (subscriberList.topicMatch(topic)) { if (subscriberList.topicMatch(topicString)) {
logger.trace("Topic match for '{}' using regex {}", topic, subscriberList.getTopicRegexPattern()); logger.trace("Topic match for '{}' using regex {}", topic, subscriberList.getTopicRegexPattern());
subscriberList.forEach(consumer -> matches.add(consumer)); subscriberList.forEach(consumer -> matchingSubscribers.add(consumer));
} else { } else {
logger.trace("No topic match for '{}' using regex {}", topic, logger.trace("No topic match for '{}' using regex {}", topic,
subscriberList.getTopicRegexPattern()); subscriberList.getTopicRegexPattern());
} }
}); });
} }
try { try {
matches.forEach(subscriber -> subscriber.processMessage(topic, payload)); matchingSubscribers.forEach(subscriber -> subscriber.processMessage(topicString, payload));
} catch (Exception e) { } catch (Exception e) {
logger.error("MQTT message received. MqttMessageSubscriber#processMessage() implementation failure", e); logger.error("MQTT message received. MqttMessageSubscriber#processMessage() implementation failure", e);
} }

View File

@ -1,50 +0,0 @@
/**
* Copyright (c) 2010-2019 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.smarthome.io.transport.mqtt.internal;
import java.util.concurrent.CompletableFuture;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.smarthome.io.transport.mqtt.MqttActionCallback;
/**
* Create a IMqttActionListener object for being used as a callback for a publish attempt.
*
* @author David Graeff - Initial contribution
*/
public class MqttActionAdapterCallback implements IMqttActionListener {
@Override
public void onSuccess(IMqttToken token) {
if (token.getUserContext() instanceof MqttActionCallback) {
MqttActionCallback subscriber = (MqttActionCallback) token.getUserContext();
subscriber.onSuccess(token.getTopics()[0]);
} else if (token.getUserContext() instanceof CompletableFuture) {
@SuppressWarnings("unchecked")
CompletableFuture<Boolean> future = (CompletableFuture<Boolean>) token.getUserContext();
future.complete(true);
}
}
@Override
public void onFailure(IMqttToken token, Throwable throwable) {
if (token.getUserContext() instanceof MqttActionCallback) {
MqttActionCallback subscriber = (MqttActionCallback) token.getUserContext();
subscriber.onFailure(token.getTopics()[0], throwable);
} else if (token.getUserContext() instanceof CompletableFuture) {
@SuppressWarnings("unchecked")
CompletableFuture<Boolean> future = (CompletableFuture<Boolean>) token.getUserContext();
future.completeExceptionally(throwable);
}
}
}

View File

@ -15,7 +15,6 @@ package org.eclipse.smarthome.io.transport.mqtt.internal;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.config.core.Configuration; import org.eclipse.smarthome.config.core.Configuration;
@ -68,10 +67,11 @@ public class MqttBrokerConnectionServiceInstance {
connection.stop(); connection.stop();
} }
if (configMap == null || configMap.isEmpty() || mqttService == null) { final MqttServiceImpl service = (MqttServiceImpl) mqttService;
if (configMap == null || configMap.isEmpty() || service == null) {
return; return;
} }
final @NonNull MqttServiceImpl service = (@NonNull MqttServiceImpl) mqttService;
// Parse configuration // Parse configuration
MqttBrokerConnectionConfig config = new Configuration(configMap).as(MqttBrokerConnectionConfig.class); MqttBrokerConnectionConfig config = new Configuration(configMap).as(MqttBrokerConnectionConfig.class);
@ -79,7 +79,7 @@ public class MqttBrokerConnectionServiceInstance {
try { try {
// Compute brokerID and make sure it is not empty // Compute brokerID and make sure it is not empty
String brokerID = config.getBrokerID(); String brokerID = config.getBrokerID();
if (StringUtils.isBlank(brokerID) || brokerID == null) { if (StringUtils.isBlank(brokerID)) {
logger.warn("Ignore invalid broker connection configuration: {}", config); logger.warn("Ignore invalid broker connection configuration: {}", config);
return; return;
} }

View File

@ -0,0 +1,108 @@
/**
* Copyright (c) 2010-2019 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.smarthome.io.transport.mqtt.internal.client;
import java.util.concurrent.CompletableFuture;
import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.ConnectionCallback;
import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.Protocol;
import org.eclipse.smarthome.io.transport.mqtt.MqttWillAndTestament;
import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3Client;
import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder;
import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3Connect;
import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscribe;
import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe;
/**
* The {@link Mqtt3AsyncClientWrapper} provides the wrapper for Mqttv3 async clients
*
* @author Jan N. Klug - Initial contribution
*/
@NonNullByDefault
public class Mqtt3AsyncClientWrapper extends MqttAsyncClientWrapper {
private final Mqtt3AsyncClient client;
public Mqtt3AsyncClientWrapper(String host, int port, String clientId, Protocol protocol, boolean secure,
ConnectionCallback connectionCallback, @Nullable TrustManagerFactory trustManagerFactory) {
Mqtt3ClientBuilder clientBuilder = Mqtt3Client.builder().serverHost(host).serverPort(port).identifier(clientId)
.addConnectedListener(connectionCallback).addDisconnectedListener(connectionCallback);
if (protocol == Protocol.WEBSOCKETS) {
clientBuilder.webSocketWithDefaultConfig();
}
if (secure) {
clientBuilder.sslWithDefaultConfig().sslConfig().trustManagerFactory(trustManagerFactory).applySslConfig();
}
client = clientBuilder.buildAsync();
};
@Override
public MqttClientState getState() {
return client.getState();
}
@Override
public CompletableFuture<?> subscribe(String topic, int qos, ClientCallback clientCallback) {
Mqtt3Subscribe subscribeMessage = Mqtt3Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos))
.build();
return client.subscribe(subscribeMessage, clientCallback::messageArrived);
}
@Override
public CompletableFuture<?> unsubscribe(String topic) {
Mqtt3Unsubscribe unsubscribeMessage = Mqtt3Unsubscribe.builder().topicFilter(topic).build();
return client.unsubscribe(unsubscribeMessage);
}
@Override
public CompletableFuture<Mqtt3Publish> publish(String topic, byte[] payload, boolean retain, int qos) {
Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic(topic).qos(getMqttQosFromInt(qos)).payload(payload)
.retain(retain).build();
return client.publish(publishMessage);
}
@Override
public CompletableFuture<?> connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval,
@Nullable String username, @Nullable String password) {
Mqtt3ConnectBuilder connectMessageBuilder = Mqtt3Connect.builder().keepAlive(keepAliveInterval);
if (lwt != null) {
Mqtt3Publish willPublish = Mqtt3Publish.builder().topic(lwt.getTopic()).payload(lwt.getPayload())
.retain(lwt.isRetain()).qos(getMqttQosFromInt(lwt.getQos())).build();
connectMessageBuilder.willPublish(willPublish);
}
if (!StringUtils.isBlank(username) && !StringUtils.isBlank(password) && password != null) {
connectMessageBuilder.simpleAuth().username(username).password(password.getBytes()).applySimpleAuth();
}
return client.connect(connectMessageBuilder.build());
}
@Override
public CompletableFuture<Void> disconnect() {
return client.disconnect();
}
}

View File

@ -0,0 +1,109 @@
/**
* Copyright (c) 2010-2019 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.smarthome.io.transport.mqtt.internal.client;
import java.util.concurrent.CompletableFuture;
import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.ConnectionCallback;
import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.Protocol;
import org.eclipse.smarthome.io.transport.mqtt.MqttWillAndTestament;
import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
/**
* The {@link Mqtt5AsyncClientWrapper} provides the wrapper for Mqttv3 async clients
*
* @author Jan N. Klug - Initial contribution
*/
@NonNullByDefault
public class Mqtt5AsyncClientWrapper extends MqttAsyncClientWrapper {
private final Mqtt5AsyncClient client;
public Mqtt5AsyncClientWrapper(String host, int port, String clientId, Protocol protocol, boolean secure,
ConnectionCallback connectionCallback, @Nullable TrustManagerFactory trustManagerFactory) {
Mqtt5ClientBuilder clientBuilder = Mqtt5Client.builder().serverHost(host).serverPort(port).identifier(clientId)
.addConnectedListener(connectionCallback).addDisconnectedListener(connectionCallback);
if (protocol == Protocol.WEBSOCKETS) {
clientBuilder.webSocketWithDefaultConfig();
}
if (secure) {
clientBuilder.sslWithDefaultConfig().sslConfig().trustManagerFactory(trustManagerFactory).applySslConfig();
}
client = clientBuilder.buildAsync();
};
@Override
public MqttClientState getState() {
return client.getState();
}
@Override
public CompletableFuture<?> subscribe(String topic, int qos, ClientCallback clientCallback) {
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos))
.build();
return client.subscribe(subscribeMessage, clientCallback::messageArrived);
}
@Override
public CompletableFuture<?> unsubscribe(String topic) {
Mqtt5Unsubscribe unsubscribeMessage = Mqtt5Unsubscribe.builder().topicFilter(topic).build();
return client.unsubscribe(unsubscribeMessage);
}
@Override
public CompletableFuture<Mqtt5PublishResult> publish(String topic, byte[] payload, boolean retain, int qos) {
Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic).qos(getMqttQosFromInt(qos)).payload(payload)
.retain(retain).build();
return client.publish(publishMessage);
}
@Override
public CompletableFuture<?> connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval,
@Nullable String username, @Nullable String password) {
Mqtt5ConnectBuilder connectMessageBuilder = Mqtt5Connect.builder().keepAlive(keepAliveInterval);
if (lwt != null) {
Mqtt5Publish willPublish = Mqtt5Publish.builder().topic(lwt.getTopic()).payload(lwt.getPayload())
.retain(lwt.isRetain()).qos(getMqttQosFromInt(lwt.getQos())).build();
connectMessageBuilder.willPublish(willPublish);
}
if (!StringUtils.isBlank(username) && !StringUtils.isBlank(password) && password != null) {
connectMessageBuilder.simpleAuth().username(username).password(password.getBytes()).applySimpleAuth();
}
return client.connect(connectMessageBuilder.build());
}
@Override
public CompletableFuture<Void> disconnect() {
return client.disconnect();
}
}

View File

@ -0,0 +1,58 @@
/**
* Copyright (c) 2010-2019 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.smarthome.io.transport.mqtt.internal.client;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.io.transport.mqtt.MqttWillAndTestament;
import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.datatypes.MqttQos;
/**
* The {@link AbstractMqttAsyncClient} is the base class for async client wrappers
*
* @author Jan N. Klug - Initial contribution
*/
@NonNullByDefault
public abstract class MqttAsyncClientWrapper {
public abstract CompletableFuture<?> connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval,
@Nullable String username, @Nullable String password);
public abstract CompletableFuture<Void> disconnect();
public abstract MqttClientState getState();
public abstract CompletableFuture<?> subscribe(String topic, int qos, ClientCallback clientCallback);
public abstract CompletableFuture<?> unsubscribe(String topic);
public abstract CompletableFuture<?> publish(String topic, byte[] payload, boolean retain, int qos);
protected MqttQos getMqttQosFromInt(int qos) {
switch (qos) {
case 0:
return MqttQos.AT_LEAST_ONCE;
case 1:
return MqttQos.AT_MOST_ONCE;
case 2:
return MqttQos.EXACTLY_ONCE;
default:
throw new IllegalArgumentException("QoS needs to be 0, 1 or 2.");
}
}
}

View File

@ -0,0 +1,85 @@
/**
* Copyright (c) 2010-2019 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.smarthome.io.transport.mqtt.ssl;
import java.lang.reflect.Field;
import java.security.KeyStore;
import javax.net.ssl.ManagerFactoryParameters;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.io.transport.mqtt.sslcontext.SSLContextProvider;
import org.osgi.service.cm.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SimpleTrustManagerFactory;
/**
* The {@link CustomTrustManagerFactory} is a TrustManagerFactory that provides a custom {@link TrustManager}
*
* @author Jan N. Klug - Initial contribution
*/
@NonNullByDefault
public class CustomTrustManagerFactory extends SimpleTrustManagerFactory {
private final Logger logger = LoggerFactory.getLogger(CustomTrustManagerFactory.class);
private final TrustManager[] trustManagers;
public CustomTrustManagerFactory(TrustManager[] trustManagers) {
this.trustManagers = trustManagers;
}
@Deprecated
public CustomTrustManagerFactory(SSLContextProvider contextProvider) {
TrustManager[] tm;
try {
SSLContext ctx = contextProvider.getContext();
// get SSLContextImpl
Field contextSpiField = ctx.getClass().getDeclaredField("contextSpi");
contextSpiField.setAccessible(true);
Object sslContextImpl = contextSpiField.get(ctx);
Class<?> sslContextImplClass = sslContextImpl.getClass().getSuperclass().getSuperclass();
// get trustmanager
Field trustManagerField = sslContextImplClass.getDeclaredField("trustManager");
trustManagerField.setAccessible(true);
Object trustManagerObj = trustManagerField.get(sslContextImpl);
tm = new TrustManager[] { (X509TrustManager) trustManagerObj };
} catch (IllegalAccessException | NoSuchFieldException | ConfigurationException e) {
logger.warn("using default insecure trustmanager, could not extract trustmanager from SSL context:", e);
tm = InsecureTrustManagerFactory.INSTANCE.getTrustManagers();
}
trustManagers = tm;
}
@Override
protected void engineInit(@Nullable KeyStore keyStore) throws Exception {
}
@Override
protected void engineInit(@Nullable ManagerFactoryParameters managerFactoryParameters) throws Exception {
}
@Override
protected TrustManager[] engineGetTrustManagers() {
return trustManagers;
}
}

View File

@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
* *
* @author David Graeff - Initial contribution * @author David Graeff - Initial contribution
*/ */
@Deprecated
public class AcceptAllCertificatesSSLContext implements SSLContextProvider { public class AcceptAllCertificatesSSLContext implements SSLContextProvider {
private final Logger logger = LoggerFactory.getLogger(AcceptAllCertificatesSSLContext.class); private final Logger logger = LoggerFactory.getLogger(AcceptAllCertificatesSSLContext.class);

View File

@ -0,0 +1,56 @@
/**
* Copyright (c) 2010-2019 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.smarthome.io.transport.mqtt.sslcontext;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.osgi.service.cm.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This SSLContextProvider returns an {@link SSLContext} that accepts all connections and doesn't perform any
* certificate validations. This implementation forces a TLS v1.2 {@link SSLContext} instance.
*
* @author Jan N. Klug - Initial contribution
*/
@Deprecated
public class CustomSSLContextProvider implements SSLContextProvider {
private final Logger logger = LoggerFactory.getLogger(CustomSSLContextProvider.class);
private final TrustManagerFactory factory;
public CustomSSLContextProvider(TrustManagerFactory factory) {
this.factory = factory;
}
@Override
public SSLContext getContext() throws ConfigurationException {
try {
if (factory == null) {
return SSLContext.getDefault();
} else {
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, factory.getTrustManagers(), null);
return sslContext;
}
} catch (KeyManagementException | NoSuchAlgorithmException e) {
logger.warn("SSL configuration failed", e);
throw new ConfigurationException("ssl", e.getMessage());
}
}
}

View File

@ -24,6 +24,7 @@ import org.osgi.service.cm.ConfigurationException;
* *
* @author David Graeff - Initial contribution * @author David Graeff - Initial contribution
*/ */
@Deprecated
public interface SSLContextProvider { public interface SSLContextProvider {
/** /**
* Return an {@link SSLContext} to be used by secure Mqtt broker connections. Never return null here. If you are not * Return an {@link SSLContext} to be used by secure Mqtt broker connections. Never return null here. If you are not

View File

@ -1,134 +0,0 @@
/**
* Copyright (c) 2010-2019 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.smarthome.io.transport.mqtt;
import static org.mockito.Mockito.*;
import java.util.Collections;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
/**
* We need an extended MqttAsyncClientEx to overwrite the connection state.
*
* In respect to the success flags the operations publish, subscribe, unsubscribe, connect,
* disconnect immediately succeed or fail.
*
* @author David Graeff - Initial contribution
*/
public class MqttAsyncClientEx extends MqttAsyncClient {
public MqttBrokerConnectionEx connection;
public MqttAsyncClientEx(String serverURI, String clientId, MqttClientPersistence dataStore,
MqttBrokerConnectionEx connection) throws MqttException {
super(serverURI, clientId, dataStore);
this.connection = connection;
}
IMqttToken getToken(Object userContext, IMqttActionListener callback, String topic) {
IMqttToken t = mock(IMqttToken.class);
doReturn(userContext).when(t).getUserContext();
doReturn(true).when(t).isComplete();
doReturn(Collections.singletonList(topic).toArray(new String[1])).when(t).getTopics();
doReturn(MqttAsyncClientEx.this).when(t).getClient();
doReturn(callback).when(t).getActionCallback();
doReturn(null).when(t).getException();
return t;
}
IMqttDeliveryToken getDeliveryToken(Object userContext, IMqttActionListener callback, String topic) {
IMqttDeliveryToken t = mock(IMqttDeliveryToken.class);
doReturn(userContext).when(t).getUserContext();
doReturn(true).when(t).isComplete();
doReturn(Collections.singletonList(topic).toArray(new String[1])).when(t).getTopics();
doReturn(MqttAsyncClientEx.this).when(t).getClient();
doReturn(callback).when(t).getActionCallback();
doReturn(null).when(t).getException();
return t;
}
@Override
public boolean isConnected() {
return connection.connectionStateOverwrite == MqttConnectionState.CONNECTED;
}
@Override
public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained, Object userContext,
IMqttActionListener callback) throws MqttException, MqttPersistenceException {
if (connection.publishSuccess) {
callback.onSuccess(getToken(userContext, callback, topic));
} else {
callback.onFailure(getToken(userContext, callback, topic), new MqttException(0));
}
return getDeliveryToken(userContext, callback, topic);
}
@Override
public IMqttToken subscribe(String topic, int qos, Object userContext, IMqttActionListener callback)
throws MqttException {
if (connection.publishSuccess) {
callback.onSuccess(getToken(userContext, callback, topic));
} else {
callback.onFailure(getToken(userContext, callback, topic), new MqttException(0));
}
return getToken(userContext, callback, topic);
}
@Override
public IMqttToken unsubscribe(String topic, Object userContext, IMqttActionListener callback) throws MqttException {
if (connection.unsubscribeSuccess) {
callback.onSuccess(getToken(userContext, callback, topic));
} else {
callback.onFailure(getToken(userContext, callback, topic), new MqttException(0));
}
return getToken(userContext, callback, topic);
}
@Override
public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActionListener callback)
throws MqttException {
connection.connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
if (callback != null) {
if (connection.disconnectSuccess) {
callback.onSuccess(getToken(userContext, callback, null));
} else {
callback.onFailure(getToken(userContext, callback, null), new MqttException(0));
}
}
return getToken(userContext, callback, null);
}
@Override
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
throws MqttException, MqttSecurityException {
if (!connection.connectTimeout) {
connection.connectionStateOverwrite = MqttConnectionState.CONNECTED;
if (connection.connectSuccess) {
callback.onSuccess(getToken(userContext, callback, null));
} else {
callback.onFailure(getToken(userContext, callback, null), new MqttException(0));
}
} else {
connection.connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
}
return getToken(userContext, callback, null);
}
}

View File

@ -12,22 +12,27 @@
*/ */
package org.eclipse.smarthome.io.transport.mqtt; package org.eclipse.smarthome.io.transport.mqtt;
import static org.mockito.Mockito.spy; import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.smarthome.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import com.hivemq.client.mqtt.MqttClientState;
/** /**
* We need an extended MqttBrokerConnection to overwrite the protected `connectionCallbacks` with * We need an extended MqttBrokerConnection to overwrite the protected `connectionCallbacks` with
* an instance that takes the mocked version of `MqttBrokerConnection` and overwrite the connection state. * an instance that takes the mocked version of `MqttBrokerConnection` and overwrite the connection state.
* *
* We also replace the internal MqttAsyncClient with a spied one, that in respect to the success flags * We also mock the internal Mqtt3AsyncClient that in respect to the success flags
* immediately succeed or fail with publish, subscribe, unsubscribe, connect, disconnect. * immediately succeed or fail with publish, subscribe, unsubscribe, connect, disconnect.
* *
* @author David Graeff - Initial contribution * @author David Graeff - Initial contribution
* @author Jan N. Klug - adjusted to HiveMQ client
*/ */
@NonNullByDefault @NonNullByDefault
public class MqttBrokerConnectionEx extends MqttBrokerConnection { public class MqttBrokerConnectionEx extends MqttBrokerConnection {
@ -48,9 +53,50 @@ public class MqttBrokerConnectionEx extends MqttBrokerConnection {
} }
@Override @Override
protected MqttAsyncClient createClient(String serverURI, String clientId, MqttClientPersistence dataStore) protected MqttAsyncClientWrapper createClient() {
throws org.eclipse.paho.client.mqttv3.MqttException { MqttAsyncClientWrapper mockedClient = mock(MqttAsyncClientWrapper.class);
return spy(new MqttAsyncClientEx(serverURI, clientId, dataStore, this)); // connect
doAnswer(i -> {
if (!connectTimeout) {
connectionCallback.onConnected(null);
connectionStateOverwrite = MqttConnectionState.CONNECTED;
return CompletableFuture.completedFuture(null);
}
return new CompletableFuture<Boolean>();
}).when(mockedClient).connect(any(), anyInt(), any(), any());
doAnswer(i -> {
if (disconnectSuccess) {
connectionCallback.onDisconnected(new Throwable("disconnect"));
connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
return CompletableFuture.completedFuture(null);
}
return new CompletableFuture<Boolean>();
}).when(mockedClient).disconnect();
// subscribe
doAnswer(i -> {
if (subscribeSuccess) {
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new Throwable("subscription failed"));
return future;
}
}).when(mockedClient).subscribe(any(), anyInt(), any());
// unsubscribe
doAnswer(i -> {
if (unsubscribeSuccess) {
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new Throwable("unsubscription failed"));
return future;
}
}).when(mockedClient).unsubscribe(any());
// state
doAnswer(i -> {
return MqttClientState.CONNECTED;
}).when(mockedClient).getState();
return mockedClient;
} }
@Override @Override

View File

@ -28,22 +28,25 @@ import java.util.concurrent.TimeoutException;
import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.smarthome.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrategy; import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrategy;
import org.eclipse.smarthome.io.transport.mqtt.reconnect.PeriodicReconnectStrategy; import org.eclipse.smarthome.io.transport.mqtt.reconnect.PeriodicReconnectStrategy;
import org.junit.Test; import org.junit.Test;
import org.osgi.service.cm.ConfigurationException; import org.osgi.service.cm.ConfigurationException;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
/** /**
* Tests the MqttBrokerConnection class * Tests the MqttBrokerConnection class
* *
* @author David Graeff - Initial contribution * @author David Graeff - Initial contribution
* @author Jan N. Klug - adjusted to HiveMQ client
*/ */
public class MqttBrokerConnectionTests { public class MqttBrokerConnectionTests {
@Test @Test
public void subscribeBeforeOnlineThenConnect() throws ConfigurationException, MqttException, InterruptedException, public void subscribeBeforeOnlineThenConnect()
ExecutionException, TimeoutException, org.eclipse.paho.client.mqttv3.MqttException { throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException {
MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false, MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false,
"MqttBrokerConnectionTests"); "MqttBrokerConnectionTests");
@ -55,14 +58,16 @@ public class MqttBrokerConnectionTests {
assertTrue(connection.hasSubscribers()); assertTrue(connection.hasSubscribers());
assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED)); assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("homie/device123/$name").payload("hello".getBytes())
.build();
// Test if subscription is active // Test if subscription is active
connection.clientCallback.messageArrived("homie/device123/$name", new MqttMessage("hello".getBytes())); connection.clientCallback.messageArrived(publishMessage);
verify(subscriber).processMessage(eq("homie/device123/$name"), eq("hello".getBytes())); verify(subscriber).processMessage(eq("homie/device123/$name"), eq("hello".getBytes()));
} }
@Test @Test
public void subscribeToWildcardTopic() throws ConfigurationException, MqttException, InterruptedException, public void subscribeToWildcardTopic()
ExecutionException, TimeoutException, org.eclipse.paho.client.mqttv3.MqttException { throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException {
MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false, MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false,
"MqttBrokerConnectionTests"); "MqttBrokerConnectionTests");
@ -80,7 +85,9 @@ public class MqttBrokerConnectionTests {
assertTrue(connection.hasSubscribers()); assertTrue(connection.hasSubscribers());
assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED)); assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
connection.clientCallback.messageArrived("homie/device123/$name", new MqttMessage("hello".getBytes())); Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("homie/device123/$name").payload("hello".getBytes())
.build();
connection.clientCallback.messageArrived(publishMessage);
verify(subscriber).processMessage(eq("homie/device123/$name"), eq("hello".getBytes())); verify(subscriber).processMessage(eq("homie/device123/$name"), eq("hello".getBytes()));
verify(subscriber2).processMessage(eq("homie/device123/$name"), eq("hello".getBytes())); verify(subscriber2).processMessage(eq("homie/device123/$name"), eq("hello".getBytes()));
@ -88,8 +95,8 @@ public class MqttBrokerConnectionTests {
} }
@Test @Test
public void subscriber() throws ConfigurationException, MqttException, InterruptedException, ExecutionException, public void subscriber()
TimeoutException, org.eclipse.paho.client.mqttv3.MqttException { throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException {
MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false, MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false,
"MqttBrokerConnectionTests"); "MqttBrokerConnectionTests");
@ -116,7 +123,7 @@ public class MqttBrokerConnectionTests {
// Add subscriber (while connected) // Add subscriber (while connected)
CompletableFuture<Boolean> future = connection.subscribe("topic", subscriber); CompletableFuture<Boolean> future = connection.subscribe("topic", subscriber);
verify(connection.client).subscribe(any(), anyInt(), any(), any()); verify(connection.client).subscribe(any(), anyInt(), any());
assertTrue(future.get(200, TimeUnit.MILLISECONDS)); assertTrue(future.get(200, TimeUnit.MILLISECONDS));
// Remove subscriber (while connected) // Remove subscriber (while connected)
@ -155,11 +162,9 @@ public class MqttBrokerConnectionTests {
// Fake a disconnect // Fake a disconnect
connection.setReconnectStrategy(mockPolicy); connection.setReconnectStrategy(mockPolicy);
doReturn(MqttConnectionState.DISCONNECTED).when(connection).connectionState(); doReturn(MqttConnectionState.DISCONNECTED).when(connection).connectionState();
IMqttToken token = mock(IMqttToken.class);
when(token.getException()).thenReturn(new org.eclipse.paho.client.mqttv3.MqttException(1));
connection.isConnecting = true; /* Pretend that start did something */ connection.isConnecting = true; /* Pretend that start did something */
connection.connectionCallback.onFailure(token, null); connection.connectionCallback.onDisconnected(new Throwable("disconnected"));
// Check lostConnect // Check lostConnect
verify(mockPolicy).lostConnection(); verify(mockPolicy).lostConnection();
@ -168,7 +173,7 @@ public class MqttBrokerConnectionTests {
assertTrue(mockPolicy.isReconnecting()); assertTrue(mockPolicy.isReconnecting());
// Fake connection established // Fake connection established
connection.connectionCallback.onSuccess(token); connection.connectionCallback.onConnected(null);
assertFalse(mockPolicy.isReconnecting()); assertFalse(mockPolicy.isReconnecting());
} }
@ -221,8 +226,8 @@ public class MqttBrokerConnectionTests {
CompletableFuture<Boolean> future = connection.start(); CompletableFuture<Boolean> future = connection.start();
verify(connection.connectionCallback).createFuture(); verify(connection.connectionCallback).createFuture();
verify(connection.connectionCallback, times(0)).onSuccess(any()); verify(connection.connectionCallback, times(0)).onConnected(any());
verify(connection.connectionCallback, times(0)).onFailure(any(), any()); verify(connection.connectionCallback, times(0)).onDisconnected(any(MqttClientDisconnectedContext.class));
assertNotNull(connection.timeoutFuture); assertNotNull(connection.timeoutFuture);
assertThat(future.get(70, TimeUnit.MILLISECONDS), is(false)); assertThat(future.get(70, TimeUnit.MILLISECONDS), is(false));
@ -245,17 +250,13 @@ public class MqttBrokerConnectionTests {
// Cause a success callback // Cause a success callback
connection.connectionStateOverwrite = MqttConnectionState.CONNECTED; connection.connectionStateOverwrite = MqttConnectionState.CONNECTED;
connection.connectionCallback.onSuccess(null); connection.connectionCallback.onConnected(null);
verify(connectionObserver, times(1)).connectionStateChanged(eq(MqttConnectionState.CONNECTED), isNull()); verify(connectionObserver, times(1)).connectionStateChanged(eq(MqttConnectionState.CONNECTED), isNull());
// Cause a failure callback with a mocked token Exception testException = new Exception("test message");
IMqttToken token = mock(IMqttToken.class);
org.eclipse.paho.client.mqttv3.MqttException testException = new org.eclipse.paho.client.mqttv3.MqttException(
1);
when(token.getException()).thenReturn(testException);
connection.connectionStateOverwrite = MqttConnectionState.DISCONNECTED; connection.connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
connection.connectionCallback.onFailure(token, null); connection.connectionCallback.onDisconnected(testException);
verify(connectionObserver, times(1)).connectionStateChanged(eq(MqttConnectionState.DISCONNECTED), verify(connectionObserver, times(1)).connectionStateChanged(eq(MqttConnectionState.DISCONNECTED),
eq(testException)); eq(testException));
@ -318,7 +319,6 @@ public class MqttBrokerConnectionTests {
assertEquals(1, connection.getQos()); assertEquals(1, connection.getQos());
// Check for default ssl context provider and reconnect policy // Check for default ssl context provider and reconnect policy
assertNotNull(connection.getSSLContextProvider());
assertNotNull(connection.getReconnectStrategy()); assertNotNull(connection.getReconnectStrategy());
assertThat(connection.connectionState(), equalTo(MqttConnectionState.DISCONNECTED)); assertThat(connection.connectionState(), equalTo(MqttConnectionState.DISCONNECTED));
@ -326,8 +326,8 @@ public class MqttBrokerConnectionTests {
@SuppressWarnings("null") @SuppressWarnings("null")
@Test @Test
public void gracefulStop() throws ConfigurationException, MqttException, InterruptedException, ExecutionException, public void gracefulStop()
TimeoutException, org.eclipse.paho.client.mqttv3.MqttException { throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException {
MqttBrokerConnectionEx connection = spy( MqttBrokerConnectionEx connection = spy(
new MqttBrokerConnectionEx("123.123.123.123", null, false, "MqttBrokerConnectionTests")); new MqttBrokerConnectionEx("123.123.123.123", null, false, "MqttBrokerConnectionTests"));
@ -340,7 +340,7 @@ public class MqttBrokerConnectionTests {
assertThat(connection.hasSubscribers(), is(true)); assertThat(connection.hasSubscribers(), is(true));
// Let's observe the internal connection client // Let's observe the internal connection client
MqttAsyncClientEx client = (MqttAsyncClientEx) connection.client; MqttAsyncClientWrapper client = connection.client;
// Stop // Stop
CompletableFuture<Boolean> future = connection.stop(); CompletableFuture<Boolean> future = connection.stop();
@ -353,7 +353,7 @@ public class MqttBrokerConnectionTests {
future.get(1000, TimeUnit.MILLISECONDS); future.get(1000, TimeUnit.MILLISECONDS);
verify(connection).unsubscribeAll(); verify(connection).unsubscribeAll();
verify(client).disconnect(anyLong(), any(), any()); verify(client).disconnect();
// Subscribers should be removed // Subscribers should be removed
assertThat(connection.hasSubscribers(), is(false)); assertThat(connection.hasSubscribers(), is(false));

View File

@ -40,7 +40,7 @@ public class MqttServiceTests {
service.addBrokersListener(observer); service.addBrokersListener(observer);
assertTrue(service.hasBrokerObservers()); assertTrue(service.hasBrokerObservers());
MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("tcp://123.123.123.123", null, false, MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false,
"brokerConnectionListenerTests"); "brokerConnectionListenerTests");
assertTrue(service.addBrokerConnection("name", connection)); assertTrue(service.addBrokerConnection("name", connection));

View File

@ -215,8 +215,8 @@
<feature name="openhab-core-io-transport-mqtt" version="${project.version}"> <feature name="openhab-core-io-transport-mqtt" version="${project.version}">
<feature>openhab-core-base</feature> <feature>openhab-core-base</feature>
<requirement>openhab.tp;filter:="(feature=paho)"</requirement> <requirement>openhab.tp;filter:="(feature=hivemqclient)"</requirement>
<feature dependency="true">openhab.tp-paho</feature> <feature dependency="true">openhab.tp-hivemqclient</feature>
<bundle>mvn:org.openhab.core.bundles/org.openhab.core.io.transport.mqtt/${project.version}</bundle> <bundle>mvn:org.openhab.core.bundles/org.openhab.core.io.transport.mqtt/${project.version}</bundle>
</feature> </feature>

View File

@ -57,6 +57,18 @@
<bundle>mvn:org.eclipse.orbit.bundles/com.google.gson/2.8.2.v20180104-1110</bundle> <bundle>mvn:org.eclipse.orbit.bundles/com.google.gson/2.8.2.v20180104-1110</bundle>
</feature> </feature>
<feature name="openhab.tp-hivemqclient" description="MQTT Client" version="${project.version}">
<capability>openhab.tp;feature=hivemqclient;version=1.1.1</capability>
<feature prerequisite="true">wrap</feature>
<feature dependency="true">openhab.tp-netty</feature>
<bundle dependency="true">mvn:org.jctools/jctools-core/2.1.2</bundle>
<bundle dependency="true">mvn:io.reactivex.rxjava2/rxjava/2.2.5</bundle>
<bundle dependency="true">mvn:org.reactivestreams/reactive-streams/1.0.2</bundle>
<bundle dependency="true">wrap:mvn:com.google.dagger/dagger/2.20</bundle>
<bundle dependency="true">mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.javax-inject/1_2</bundle>
<bundle>mvn:com.hivemq/hivemq-mqtt-client/1.1.2</bundle>
</feature>
<feature name="openhab.tp-httpclient" version="${project.version}"> <feature name="openhab.tp-httpclient" version="${project.version}">
<capability>openhab.tp;feature=httpclient;version=${jetty.version}</capability> <capability>openhab.tp;feature=httpclient;version=${jetty.version}</capability>
<bundle dependency="true">mvn:javax.servlet/javax.servlet-api/3.1.0</bundle> <bundle dependency="true">mvn:javax.servlet/javax.servlet-api/3.1.0</bundle>
@ -190,12 +202,6 @@
<bundle dependency="true">mvn:org.mapdb/mapdb/1.0.9</bundle> <bundle dependency="true">mvn:org.mapdb/mapdb/1.0.9</bundle>
</feature> </feature>
<feature name="openhab.tp-paho" description="MQTT v3 Client" version="${project.version}">
<capability>openhab.tp;feature=paho;version=1.2.1</capability>
<bundle>mvn:org.eclipse.paho/org.eclipse.paho.client.mqttv3/1.2.1</bundle>
</feature>
<feature name="openhab.tp-serial-javacomm" version="${project.version}"> <feature name="openhab.tp-serial-javacomm" version="${project.version}">
<capability>openhab.tp;feature=serial;impl=javacomm</capability> <capability>openhab.tp;feature=serial;impl=javacomm</capability>
<bundle>mvn:org.eclipse.kura/org.eclipse.soda.dk.comm/1.2.201</bundle> <bundle>mvn:org.eclipse.kura/org.eclipse.soda.dk.comm/1.2.201</bundle>