From 07d10dba3d73349ce2f302a6393e255ee4de788d Mon Sep 17 00:00:00 2001 From: J-N-K Date: Tue, 5 Nov 2019 07:51:59 +0100 Subject: [PATCH] move MQTT client from Paho to HiveMQ (#1125) Signed-off-by: Jan N. Klug --- bom/compile/pom.xml | 8 +- bom/runtime/pom.xml | 8 +- .../transport/mqtt/MqttBrokerConnection.java | 451 +++++++++--------- .../mqtt/MqttBrokerConnectionConfig.java | 1 + .../io/transport/mqtt/MqttException.java | 40 +- .../mqtt/internal/ClientCallback.java | 44 +- .../internal/MqttActionAdapterCallback.java | 50 -- .../MqttBrokerConnectionServiceInstance.java | 8 +- .../client/Mqtt3AsyncClientWrapper.java | 108 +++++ .../client/Mqtt5AsyncClientWrapper.java | 109 +++++ .../client/MqttAsyncClientWrapper.java | 58 +++ .../mqtt/ssl/CustomTrustManagerFactory.java | 85 ++++ .../AcceptAllCertificatesSSLContext.java | 1 + .../sslcontext/CustomSSLContextProvider.java | 56 +++ .../mqtt/sslcontext/SSLContextProvider.java | 1 + .../io/transport/mqtt/MqttAsyncClientEx.java | 134 ------ .../mqtt/MqttBrokerConnectionEx.java | 60 ++- .../mqtt/MqttBrokerConnectionTests.java | 58 +-- .../mqtt/internal/MqttServiceTests.java | 2 +- .../openhab-core/src/main/feature/feature.xml | 4 +- .../openhab-tp/src/main/feature/feature.xml | 18 +- 21 files changed, 793 insertions(+), 511 deletions(-) delete mode 100644 bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttActionAdapterCallback.java create mode 100644 bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt3AsyncClientWrapper.java create mode 100644 bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt5AsyncClientWrapper.java create mode 100644 bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java create mode 100644 bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/ssl/CustomTrustManagerFactory.java create mode 100644 bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/CustomSSLContextProvider.java delete mode 100644 bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttAsyncClientEx.java diff --git a/bom/compile/pom.xml b/bom/compile/pom.xml index be5020c83..2ad3a2caf 100644 --- a/bom/compile/pom.xml +++ b/bom/compile/pom.xml @@ -210,11 +210,11 @@ compile - + - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.2.1 + com.hivemq + hivemq-mqtt-client + 1.1.2 compile diff --git a/bom/runtime/pom.xml b/bom/runtime/pom.xml index d3259f1eb..1d11be372 100644 --- a/bom/runtime/pom.xml +++ b/bom/runtime/pom.xml @@ -450,11 +450,11 @@ compile - + - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.2.1 + com.hivemq + hivemq-mqtt-client + 1.1.2 compile diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java index de9f10de5..946c3a415 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java @@ -12,14 +12,12 @@ */ package org.eclipse.smarthome.io.transport.mqtt; -import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -29,29 +27,32 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.lang.StringUtils; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; + import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; -import org.eclipse.paho.client.mqttv3.IMqttActionListener; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.IMqttToken; -import org.eclipse.paho.client.mqttv3.MqttAsyncClient; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttClientPersistence; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.eclipse.smarthome.config.core.ConfigConstants; import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback; -import org.eclipse.smarthome.io.transport.mqtt.internal.MqttActionAdapterCallback; import org.eclipse.smarthome.io.transport.mqtt.internal.TopicSubscribers; +import org.eclipse.smarthome.io.transport.mqtt.internal.client.Mqtt3AsyncClientWrapper; +import org.eclipse.smarthome.io.transport.mqtt.internal.client.Mqtt5AsyncClientWrapper; +import org.eclipse.smarthome.io.transport.mqtt.internal.client.MqttAsyncClientWrapper; import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrategy; import org.eclipse.smarthome.io.transport.mqtt.reconnect.PeriodicReconnectStrategy; -import org.eclipse.smarthome.io.transport.mqtt.sslcontext.AcceptAllCertificatesSSLContext; +import org.eclipse.smarthome.io.transport.mqtt.ssl.CustomTrustManagerFactory; +import org.eclipse.smarthome.io.transport.mqtt.sslcontext.CustomSSLContextProvider; import org.eclipse.smarthome.io.transport.mqtt.sslcontext.SSLContextProvider; import org.osgi.service.cm.ConfigurationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext; +import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener; +import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext; +import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener; + +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + /** * An MQTTBrokerConnection represents a single client connection to a MQTT broker. * @@ -59,7 +60,8 @@ import org.slf4j.LoggerFactory; * * @author Davy Vanherbergen - Initial contribution * @author David Graeff - All operations are async now. More flexible sslContextProvider and reconnectStrategy added. - * @author Markus Rathgeb - Added connection state callback + * @author Markus Rathgeb - added connection state callback + * @author Jan N. Klug - changed from PAHO to HiveMQ client */ @NonNullByDefault public class MqttBrokerConnection { @@ -73,29 +75,39 @@ public class MqttBrokerConnection { public enum Protocol { TCP, WEBSOCKETS - }; + } - /// Connection parameters + /** + * MQTT version (currently v3 and v5) + */ + public enum MqttVersion { + V3, + V5 + } + + // Connection parameters protected final Protocol protocol; protected final String host; protected final int port; protected final boolean secure; + protected final MqttVersion mqttVersion; + + private @Nullable TrustManagerFactory trustManagerFactory = InsecureTrustManagerFactory.INSTANCE; + private SSLContextProvider sslContextProvider = new CustomSSLContextProvider(trustManagerFactory); protected final String clientId; private @Nullable String user; private @Nullable String password; /// Configuration variables private int qos = DEFAULT_QOS; + @Deprecated private boolean retain = false; private @Nullable MqttWillAndTestament lastWill; - private @Nullable Path persistencePath; protected @Nullable AbstractReconnectStrategy reconnectStrategy; - private SSLContextProvider sslContextProvider = new AcceptAllCertificatesSSLContext(); private int keepAliveInterval = DEFAULT_KEEPALIVE_INTERVAL; /// Runtime variables - protected @Nullable MqttAsyncClient client; - protected @Nullable MqttClientPersistence dataStore; + protected @Nullable MqttAsyncClientWrapper client; protected boolean isConnecting = false; protected final List connectionObservers = new CopyOnWriteArrayList<>(); @@ -107,12 +119,12 @@ public class MqttBrokerConnection { private int timeout = 1200; /* Connection timeout in milliseconds */ /** - * Create a IMqttActionListener object for being used as a callback for a connection attempt. + * Create a listener object for being used as a callback for a connection attempt. * The callback will interact with the {@link AbstractReconnectStrategy} as well as inform registered * {@link MqttConnectionObserver}s. */ - @NonNullByDefault({}) - public class ConnectionCallback implements IMqttActionListener { + @NonNullByDefault + public class ConnectionCallback implements MqttClientConnectedListener, MqttClientDisconnectedListener { private final MqttBrokerConnection connection; private final Runnable cancelTimeoutFuture; private CompletableFuture future = new CompletableFuture<>(); @@ -122,8 +134,7 @@ public class MqttBrokerConnection { this.cancelTimeoutFuture = mqttBrokerConnectionImpl::cancelTimeoutFuture; } - @Override - public void onSuccess(IMqttToken asyncActionToken) { + public void onConnected(@Nullable MqttClientConnectedContext context) { cancelTimeoutFuture.run(); connection.isConnecting = false; @@ -143,15 +154,21 @@ public class MqttBrokerConnection { }); } - @Override - public void onFailure(@Nullable IMqttToken token, @Nullable Throwable error) { - cancelTimeoutFuture.run(); + public void onDisconnected(@Nullable MqttClientDisconnectedContext context) { + if (context != null) { + final Throwable throwable = context.getCause(); + onDisconnected(throwable); + } else { + onDisconnected(new Throwable("unknown disconnect reason")); + } + } - final Throwable throwable = (token != null && token.getException() != null) ? token.getException() : error; + public void onDisconnected(Throwable t) { + cancelTimeoutFuture.run(); final MqttConnectionState connectionState = connection.connectionState(); future.complete(false); - connection.connectionObservers.forEach(o -> o.connectionStateChanged(connectionState, throwable)); + connection.connectionObservers.forEach(o -> o.connectionStateChanged(connectionState, t)); // If we tried to connect via start(), use the reconnect strategy to try it again if (connection.isConnecting) { @@ -168,15 +185,13 @@ public class MqttBrokerConnection { } } - /** Client callback object */ + // Client callback object protected ClientCallback clientCallback = new ClientCallback(this, connectionObservers, subscribers); - /** Connection callback object */ + // Connection callback object protected ConnectionCallback connectionCallback; - /** Action callback object */ - protected IMqttActionListener actionCallback = new MqttActionAdapterCallback(); /** - * Create a new TCP MQTT client connection to a server with the given host and port. + * Create a new TCP MQTT3 client connection to a server with the given host and port. * * @param host A host name or address * @param port A port or null to select the default port for a secure or insecure connection @@ -188,11 +203,11 @@ public class MqttBrokerConnection { * @throws IllegalArgumentException If the client id or port is not valid. */ public MqttBrokerConnection(String host, @Nullable Integer port, boolean secure, @Nullable String clientId) { - this(Protocol.TCP, host, port, secure, clientId); + this(Protocol.TCP, MqttVersion.V3, host, port, secure, clientId); } /** - * Create a new MQTT client connection to a server with the given protocol, host and port. + * Create a new MQTT3 client connection to a server with the given protocol, mqtt client version, host and port. * * @param protocol The transport protocol * @param host A host name or address @@ -204,14 +219,35 @@ public class MqttBrokerConnection { * characters. * @throws IllegalArgumentException If the client id or port is not valid. */ + @Deprecated public MqttBrokerConnection(Protocol protocol, String host, @Nullable Integer port, boolean secure, @Nullable String clientId) { + this(protocol, MqttVersion.V3, host, port, secure, clientId); + } + + /** + * Create a new MQTT client connection to a server with the given protocol, host and port. + * + * @param protocol The transport protocol + * @param mqttVersion The version of the MQTT client (v3 or v5) + * @param host A host name or address + * @param port A port or null to select the default port for a secure or insecure connection + * @param secure A secure connection + * @param clientId Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are + * used for access restriction implementations. + * If none is specified, a default is generated. The client id cannot be longer than 65535 + * characters. + * @throws IllegalArgumentException If the client id or port is not valid. + */ + public MqttBrokerConnection(Protocol protocol, MqttVersion mqttVersion, String host, @Nullable Integer port, + boolean secure, @Nullable String clientId) { this.protocol = protocol; this.host = host; this.secure = secure; + this.mqttVersion = mqttVersion; String newClientID = clientId; if (newClientID == null) { - newClientID = MqttClient.generateClientId(); + newClientID = UUID.randomUUID().toString(); } else if (newClientID.length() > 65535) { throw new IllegalArgumentException("Client ID cannot be longer than 65535 characters"); } @@ -229,7 +265,7 @@ public class MqttBrokerConnection { * state to the MQTT broker changed. * * The reconnect strategy will not be informed if the initial connection to the broker - * timed out. You need a timeout executor additionally, see {@link #setTimeoutExecutor(Executor)}. + * timed out. You need a timeout executor additionally, see {@link #setTimeoutExecutor(ScheduledExecutorService, int)}. * * @param reconnectStrategy The reconnect strategy. May not be null. */ @@ -257,6 +293,23 @@ public class MqttBrokerConnection { this.timeout = timeoutInMS; } + public void setTrustManagers(TrustManager[] trustManagers) { + if (trustManagers.length != 0) { + trustManagerFactory = new CustomTrustManagerFactory(trustManagers); + } else { + trustManagerFactory = null; + } + sslContextProvider = new CustomSSLContextProvider(trustManagerFactory); + } + + public TrustManager[] getTrustManagers() { + if (trustManagerFactory != null) { + return trustManagerFactory.getTrustManagers(); + } else { + return new TrustManager[] {}; + } + } + /** * Get the MQTT broker protocol */ @@ -264,6 +317,13 @@ public class MqttBrokerConnection { return protocol; } + /** + * Get the MQTT version + */ + public MqttVersion getMqttVersion() { + return mqttVersion; + } + /** * Get the MQTT broker host */ @@ -327,25 +387,29 @@ public class MqttBrokerConnection { * @param qos level. */ public void setQos(int qos) { - if (qos >= 0 && qos <= 2) { - this.qos = qos; - } else { - throw new IllegalArgumentException("The quality of service parameter must be >=0 and <=2."); + if (qos < 0 || qos > 3) { + throw new IllegalArgumentException(); } + this.qos = qos; } /** + * use retain flags on message publish instead + * * @return true if newly messages sent to the broker should be retained by the broker. */ + @Deprecated public boolean isRetain() { return retain; } /** * Set whether newly published messages should be retained by the broker. + * use retain flags on message publish instead * * @param retain true to retain. */ + @Deprecated public void setRetain(boolean retain) { this.retain = retain; } @@ -398,8 +462,8 @@ public class MqttBrokerConnection { * * @param persistencePath the path that should be used to store persistent data */ + @Deprecated public void setPersistencePath(final @Nullable Path persistencePath) { - this.persistencePath = persistencePath; } /** @@ -418,7 +482,7 @@ public class MqttBrokerConnection { if (isConnecting) { return MqttConnectionState.CONNECTING; } - return (client != null && client.isConnected()) ? MqttConnectionState.CONNECTED + return (client != null && client.getState().isConnected()) ? MqttConnectionState.CONNECTED : MqttConnectionState.DISCONNECTED; } @@ -446,6 +510,7 @@ public class MqttBrokerConnection { /** * Return the ssl context provider. */ + @Deprecated public SSLContextProvider getSSLContextProvider() { return sslContextProvider; } @@ -456,8 +521,10 @@ public class MqttBrokerConnection { * @return The ssl context provider. Should not be null, but the ssl context will in fact * only be used if a ssl:// url is given. */ + @Deprecated public void setSSLContextProvider(SSLContextProvider sslContextProvider) { this.sslContextProvider = sslContextProvider; + trustManagerFactory = new CustomTrustManagerFactory(sslContextProvider); } /** @@ -487,17 +554,19 @@ public class MqttBrokerConnection { subscribers.put(topic, subscriberList); subscriberList.add(subscriber); } - final MqttAsyncClient client = this.client; + final MqttAsyncClientWrapper client = this.client; if (client == null) { future.completeExceptionally(new Exception("No MQTT client")); return future; } - if (client.isConnected()) { - try { - client.subscribe(topic, qos, future, actionCallback); - } catch (org.eclipse.paho.client.mqttv3.MqttException e) { - future.completeExceptionally(e); - } + if (client.getState().isConnected()) { + client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> { + if (t == null) { + future.complete(true); + } else { + future.completeExceptionally(new MqttException(t)); + } + }); } else { // The subscription will be performed on connecting. future.complete(false); @@ -514,16 +583,18 @@ public class MqttBrokerConnection { protected CompletableFuture subscribeRaw(String topic) { logger.trace("subscribeRaw message consumer for topic '{}' from broker '{}'", topic, host); CompletableFuture future = new CompletableFuture<>(); - try { - MqttAsyncClient client = this.client; - if (client != null && client.isConnected()) { - client.subscribe(topic, qos, future, actionCallback); - } else { - future.complete(false); - } - } catch (org.eclipse.paho.client.mqttv3.MqttException e) { - logger.info("Error subscribing to topic {}", topic, e); - future.completeExceptionally(e); + final MqttAsyncClientWrapper mqttClient = this.client; + if (mqttClient != null && mqttClient.getState().isConnected()) { + mqttClient.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> { + if (t == null) { + future.complete(true); + } else { + logger.warn("Failed subscribing to topic {}", topic, t); + future.completeExceptionally(new MqttException(t)); + } + }); + } else { + future.complete(false); } return future; } @@ -550,9 +621,9 @@ public class MqttBrokerConnection { // Remove from subscriber list subscribers.remove(topic); // No more subscribers to this topic. Unsubscribe topic on the broker - MqttAsyncClient client = this.client; - if (client != null) { - return unsubscribeRaw(client, topic); + MqttAsyncClientWrapper mqttClient = this.client; + if (mqttClient != null) { + return unsubscribeRaw(mqttClient, topic); } else { return CompletableFuture.completedFuture(false); } @@ -567,18 +638,19 @@ public class MqttBrokerConnection { * @return Completes with true if successful. Completes with false if no broker connection is established. * Exceptionally otherwise. */ - protected CompletableFuture unsubscribeRaw(MqttAsyncClient client, String topic) { + protected CompletableFuture unsubscribeRaw(MqttAsyncClientWrapper client, String topic) { logger.trace("Unsubscribing message consumer for topic '{}' from broker '{}'", topic, host); CompletableFuture future = new CompletableFuture<>(); - try { - if (client.isConnected()) { - client.unsubscribe(topic, future, actionCallback); - } else { - future.complete(false); - } - } catch (org.eclipse.paho.client.mqttv3.MqttException e) { - logger.info("Error unsubscribing topic from broker", e); - future.completeExceptionally(e); + if (client.getState().isConnected()) { + client.unsubscribe(topic).whenComplete((s, t) -> { + if (t == null) { + future.complete(true); + } else { + future.completeExceptionally(new MqttException(t)); + } + }); + } else { + return CompletableFuture.completedFuture(false); } return future; } @@ -608,33 +680,6 @@ public class MqttBrokerConnection { return !connectionObservers.isEmpty(); } - /** - * Create a MqttConnectOptions object using the fields of this MqttBrokerConnection instance. - * Package local, for testing. - */ - MqttConnectOptions createMqttOptions() throws ConfigurationException { - MqttConnectOptions options = new MqttConnectOptions(); - - if (!StringUtils.isBlank(user)) { - options.setUserName(user); - } - if (!StringUtils.isBlank(password) && password != null) { - options.setPassword(password.toCharArray()); - } - if (secure) { - options.setSocketFactory(sslContextProvider.getContext().getSocketFactory()); - } - - if (lastWill != null) { - MqttWillAndTestament lastWill = this.lastWill; // Make eclipse happy - options.setWill(lastWill.getTopic(), lastWill.getPayload(), lastWill.getQos(), lastWill.isRetain()); - } - - options.setKeepAliveInterval(keepAliveInterval); - options.setHttpsHostnameVerificationEnabled(false); - return options; - } - /** * This will establish a connection to the MQTT broker and if successful, notify all * publishers and subscribers that the connection has become active. This method will @@ -661,76 +706,29 @@ public class MqttBrokerConnection { } // Close client if there is still one existing - if (client != null) { - try { - client.close(); - } catch (org.eclipse.paho.client.mqttv3.MqttException ignore) { - } - client = null; + + if (this.client != null) { + this.client.disconnect(); + this.client = null; } CompletableFuture future = connectionCallback.createFuture(); - StringBuilder serverURI = new StringBuilder(); - switch (protocol) { - case TCP: - serverURI.append(secure ? "ssl://" : "tcp://"); - break; - case WEBSOCKETS: - serverURI.append(secure ? "wss://" : "ws://"); - break; - default: - future.completeExceptionally(new ConfigurationException("protocol", "Protocol unknown")); - return future; - } - serverURI.append(host); - serverURI.append(":"); - serverURI.append(port); - - // Storage - Path persistencePath = this.persistencePath; - if (persistencePath == null) { - persistencePath = Paths.get(ConfigConstants.getUserDataFolder()).resolve("mqtt").resolve(host); - } - try { - persistencePath = Files.createDirectories(persistencePath); - } catch (IOException e) { - future.completeExceptionally(new MqttException(e)); - return future; - } - MqttDefaultFilePersistence localDataStore = new MqttDefaultFilePersistence(persistencePath.toString()); - // Create the client - MqttAsyncClient localClient; - try { - localClient = createClient(serverURI.toString(), clientId, localDataStore); - } catch (org.eclipse.paho.client.mqttv3.MqttException e) { - future.completeExceptionally(new MqttException(e)); - return future; - } + MqttAsyncClientWrapper client = createClient(); + this.client = client; - // Assign to object - this.client = localClient; - this.dataStore = localDataStore; + // connect + client.connect(lastWill, keepAliveInterval, user, password); - // Connect - localClient.setCallback(clientCallback); - try { - MqttConnectOptions mqttConnectOptions = createMqttOptions(); - mqttConnectOptions.setMaxInflight(16384); // 1/4 of available message ids - localClient.connect(mqttConnectOptions, null, connectionCallback); - logger.info("Starting MQTT broker connection to '{}' with clientid {} and file store '{}'", host, - getClientId(), persistencePath); - } catch (org.eclipse.paho.client.mqttv3.MqttException | ConfigurationException e) { - future.completeExceptionally(new MqttException(e)); - return future; - } + logger.info("Starting MQTT broker connection to '{}' with clientid {}", host, getClientId()); // Connect timeout ScheduledExecutorService executor = timeoutExecutor; if (executor != null) { final ScheduledFuture timeoutFuture = this.timeoutFuture.getAndSet(executor.schedule( - () -> connectionCallback.onFailure(null, new TimeoutException()), timeout, TimeUnit.MILLISECONDS)); + () -> connectionCallback.onDisconnected(new TimeoutException("connect timed out")), timeout, + TimeUnit.MILLISECONDS)); if (timeoutFuture != null) { timeoutFuture.cancel(false); } @@ -738,18 +736,14 @@ public class MqttBrokerConnection { return future; } - /** - * Encapsulates the creation of the paho MqttAsyncClient - * - * @param serverURI A paho uri like ssl://host:port, tcp://host:port, ws[s]://host:port - * @param clientId the mqtt client ID - * @param dataStore The datastore to save qos!=0 messages until they are delivered. - * @return Returns a valid MqttAsyncClient - * @throws org.eclipse.paho.client.mqttv3.MqttException - */ - protected MqttAsyncClient createClient(String serverURI, String clientId, MqttClientPersistence dataStore) - throws org.eclipse.paho.client.mqttv3.MqttException { - return new MqttAsyncClient(serverURI, clientId, dataStore); + protected MqttAsyncClientWrapper createClient() { + if (mqttVersion == MqttVersion.V3) { + return new Mqtt3AsyncClientWrapper(host, port, clientId, protocol, secure, connectionCallback, + trustManagerFactory); + } else { + return new Mqtt5AsyncClientWrapper(host, port, clientId, protocol, secure, connectionCallback, + trustManagerFactory); + } } /** @@ -760,20 +754,11 @@ public class MqttBrokerConnection { * @return Returns the value of the parameter v. */ protected boolean finalizeStopAfterDisconnect(boolean v) { - if (client != null) { - try { - client.close(); - } catch (Exception ignore) { - } - } - client = null; - if (dataStore != null) { - try { - dataStore.close(); - } catch (Exception ignore) { - } - dataStore = null; + final MqttAsyncClientWrapper client = this.client; + if (client != null && connectionState() != MqttConnectionState.DISCONNECTED) { + client.disconnect(); } + this.client = null; connectionObservers.forEach(o -> o.connectionStateChanged(MqttConnectionState.DISCONNECTED, null)); return v; } @@ -784,7 +769,7 @@ public class MqttBrokerConnection { * @return Returns a future that completes as soon as all subscriptions have been canceled. */ public CompletableFuture unsubscribeAll() { - MqttAsyncClient client = this.client; + MqttAsyncClientWrapper client = this.client; List> futures = new ArrayList<>(); if (client != null) { subscribers.forEach((topic, subList) -> { @@ -804,7 +789,7 @@ public class MqttBrokerConnection { * @return Returns a future that completes as soon as the disconnect process has finished. */ public CompletableFuture stop() { - MqttAsyncClient client = this.client; + MqttAsyncClientWrapper client = this.client; if (client == null) { return CompletableFuture.completedFuture(true); } @@ -825,19 +810,15 @@ public class MqttBrokerConnection { CompletableFuture future = new CompletableFuture<>(); // Close connection - if (client.isConnected()) { - // We need to thread change here. Because paho does not allow to disconnect within a callback method - unsubscribeAll().thenRunAsync(() -> { - try { - client.disconnect(100).waitForCompletion(100); - if (client.isConnected()) { - client.disconnectForcibly(); + if (client.getState().isConnected()) { + unsubscribeAll().thenRun(() -> { + client.disconnect().whenComplete((m, t) -> { + if (t == null) { + future.complete(true); + } else { + future.complete(false); } - future.complete(true); - } catch (org.eclipse.paho.client.mqttv3.MqttException e) { - logger.debug("Error while closing connection to broker", e); - future.complete(false); - } + }); }); } else { future.complete(true); @@ -846,6 +827,18 @@ public class MqttBrokerConnection { return future.thenApply(this::finalizeStopAfterDisconnect); } + /** + * Publish a message to the broker. + * + * @param topic The topic + * @param payload The message payload + * @param listener A listener to be notified of success or failure of the delivery. + */ + @Deprecated + public void publish(String topic, byte[] payload, MqttActionCallback listener) { + publish(topic, payload, getQos(), isRetain(), listener); + } + /** * Publish a message to the broker with the given QoS and retained flag. * @@ -855,30 +848,22 @@ public class MqttBrokerConnection { * @param retain Set to true to retain the message on the broker * @param listener A listener to be notified of success or failure of the delivery. */ + @Deprecated public void publish(String topic, byte[] payload, int qos, boolean retain, MqttActionCallback listener) { - MqttAsyncClient localClient = client; - if (localClient == null) { - listener.onFailure(topic, new MqttException(0)); + final MqttAsyncClientWrapper client = this.client; + if (client == null) { + listener.onFailure(topic, new MqttException(new Throwable())); return; } - try { - IMqttDeliveryToken deliveryToken = localClient.publish(topic, payload, qos, retain, listener, - actionCallback); - logger.debug("Publishing message {} to topic '{}'", deliveryToken.getMessageId(), topic); - } catch (org.eclipse.paho.client.mqttv3.MqttException e) { - listener.onFailure(topic, new MqttException(e)); - } - } - /** - * Publish a message to the broker. - * - * @param topic The topic - * @param payload The message payload - * @param listener A listener to be notified of success or failure of the delivery. - */ - public void publish(String topic, byte[] payload, MqttActionCallback listener) { - publish(topic, payload, qos, retain, listener); + client.publish(topic, payload, retain, qos).whenComplete((m, t) -> { + if (t != null) { + listener.onFailure(topic, new MqttException(t)); + } else { + listener.onSuccess(topic); + } + }); + logger.debug("Publishing message to topic '{}'", topic); } /** @@ -890,7 +875,7 @@ public class MqttBrokerConnection { * exceptionally on an error or with a result of false if no broker connection is established. */ public CompletableFuture publish(String topic, byte[] payload) { - return publish(topic, payload, qos, retain); + return publish(topic, payload, getQos(), isRetain()); } /** @@ -900,23 +885,26 @@ public class MqttBrokerConnection { * @param payload The message payload * @param qos The quality of service for this message * @param retain Set to true to retain the message on the broker - * @param listener An optional listener to be notified of success or failure of the delivery. * @return Returns a future that completes with a result of true if the publishing succeeded and completes * exceptionally on an error or with a result of false if no broker connection is established. */ public CompletableFuture publish(String topic, byte[] payload, int qos, boolean retain) { - MqttAsyncClient client = this.client; + final MqttAsyncClientWrapper client = this.client; if (client == null) { return CompletableFuture.completedFuture(false); } + // publish message asynchronously - CompletableFuture f = new CompletableFuture<>(); - try { - client.publish(topic, payload, qos, retain, f, actionCallback); - } catch (org.eclipse.paho.client.mqttv3.MqttException e) { - f.completeExceptionally(new MqttException(e)); - } - return f; + CompletableFuture future = new CompletableFuture<>(); + client.publish(topic, payload, retain, qos).whenComplete((m, t) -> { + if (t == null) { + future.complete(true); + } else { + future.completeExceptionally(new MqttException(t)); + } + }); + + return future; } /** @@ -929,4 +917,5 @@ public class MqttBrokerConnection { timeoutFuture.cancel(false); } } + } diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionConfig.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionConfig.java index dd88f1340..041a1e4e3 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionConfig.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionConfig.java @@ -34,6 +34,7 @@ public class MqttBrokerConnectionConfig { public @Nullable String clientID; // MQTT parameters public Integer qos = MqttBrokerConnection.DEFAULT_QOS; + @Deprecated public Boolean retainMessages = false; /** Keepalive in seconds */ public @Nullable Integer keepAlive; diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttException.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttException.java index 730801a70..850e2213a 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttException.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttException.java @@ -12,12 +12,15 @@ */ package org.eclipse.smarthome.io.transport.mqtt; +import org.eclipse.jdt.annotation.NonNullByDefault; + /** * Thrown if an error occurs communicating with the server. The exception contains a reason code. The semantic of the * reason code depends on the underlying implementation. * * @author David Graeff - Initial contribution */ +@NonNullByDefault public class MqttException extends Exception { private static final long serialVersionUID = 301L; private int reasonCode; @@ -29,10 +32,22 @@ public class MqttException extends Exception { * * @param reasonCode the reason code for the exception. */ + @Deprecated public MqttException(int reasonCode) { + this.cause = new Exception(); this.reasonCode = reasonCode; } + /** + * Constructs a new MqttException with the specified reason + * + * @param reason the reason for the exception. + */ + public MqttException(String reason) { + this.cause = new Exception("reason"); + this.reasonCode = Integer.MIN_VALUE; + } + /** * Constructs a new MqttException with the specified * Throwable as the underlying reason. @@ -40,12 +55,7 @@ public class MqttException extends Exception { * @param cause the underlying cause of the exception. */ public MqttException(Throwable cause) { - if (cause instanceof org.eclipse.paho.client.mqttv3.MqttException) { - org.eclipse.paho.client.mqttv3.MqttException internalException = (org.eclipse.paho.client.mqttv3.MqttException) cause; - this.reasonCode = internalException.getReasonCode(); - } else { - this.reasonCode = org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CLIENT_EXCEPTION; - } + this.reasonCode = Integer.MIN_VALUE; this.cause = cause; } @@ -56,6 +66,7 @@ public class MqttException extends Exception { * @param reason the reason code for the exception. * @param cause the underlying cause of the exception. */ + @Deprecated public MqttException(int reason, Throwable cause) { this.reasonCode = reason; this.cause = cause; @@ -66,6 +77,7 @@ public class MqttException extends Exception { * * @return the code representing the reason for this exception. */ + @Deprecated public int getReasonCode() { return reasonCode; } @@ -86,11 +98,7 @@ public class MqttException extends Exception { */ @Override public String getMessage() { - if (cause != null) { - return cause.getMessage(); - } - - return "MqttException with reason " + String.valueOf(reasonCode); + return cause.getMessage(); } /** @@ -98,14 +106,6 @@ public class MqttException extends Exception { */ @Override public String toString() { - if (cause instanceof org.eclipse.paho.client.mqttv3.MqttException) { - org.eclipse.paho.client.mqttv3.MqttException internalException = (org.eclipse.paho.client.mqttv3.MqttException) cause; - return internalException.toString(); - } - String result = getMessage() + " (" + reasonCode + ")"; - if (cause != null) { - result = result + " - " + cause.toString(); - } - return result; + return cause.toString(); } } diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ClientCallback.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ClientCallback.java index e30c28fa7..b2a3a0360 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ClientCallback.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ClientCallback.java @@ -16,10 +16,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import com.hivemq.client.mqtt.datatypes.MqttTopic; +import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection; import org.eclipse.smarthome.io.transport.mqtt.MqttConnectionObserver; import org.eclipse.smarthome.io.transport.mqtt.MqttConnectionState; @@ -29,12 +28,17 @@ import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrate import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; + /** - * Processes the paho MqttCallbacks for the {@link MqttBrokerConnection}. + * Processes the MqttCallbacks for the {@link MqttBrokerConnection}. * * @author David Graeff - Initial contribution + * @author Jan N. Klug - adjusted to HiveMQ client */ -public class ClientCallback implements MqttCallback { +@NonNullByDefault +public class ClientCallback { final Logger logger = LoggerFactory.getLogger(ClientCallback.class); private final MqttBrokerConnection connection; private final List connectionObservers; @@ -47,12 +51,11 @@ public class ClientCallback implements MqttCallback { this.subscribers = subscribers; } - @Override public synchronized void connectionLost(@Nullable Throwable exception) { if (exception instanceof MqttException) { MqttException e = (MqttException) exception; - logger.info("MQTT connection to '{}' was lost: {} : ReasonCode {} : Cause : {}", connection.getHost(), - e.getMessage(), e.getReasonCode(), (e.getCause() == null ? "Unknown" : e.getCause().getMessage())); + logger.info("MQTT connection to '{}' was lost: {} : Cause : {}", connection.getHost(), e.getMessage(), + e.getCause().getMessage()); } else if (exception != null) { logger.info("MQTT connection to '{}' was lost", connection.getHost(), exception); } @@ -64,30 +67,33 @@ public class ClientCallback implements MqttCallback { } } - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - logger.trace("Message with id {} delivered.", token.getMessageId()); + public void messageArrived(Mqtt3Publish message) { + messageArrived(message.getTopic(), message.getPayloadAsBytes()); } - @Override - public void messageArrived(String topic, MqttMessage message) { - byte[] payload = message.getPayload(); + public void messageArrived(Mqtt5Publish message) { + messageArrived(message.getTopic(), message.getPayloadAsBytes()); + } + + private void messageArrived(MqttTopic topic, byte[] payload) { + String topicString = topic.toString(); logger.trace("Received message on topic '{}' : {}", topic, new String(payload)); - List matches = new ArrayList<>(); + + List matchingSubscribers = new ArrayList<>(); synchronized (subscribers) { subscribers.values().forEach(subscriberList -> { - if (subscriberList.topicMatch(topic)) { + if (subscriberList.topicMatch(topicString)) { logger.trace("Topic match for '{}' using regex {}", topic, subscriberList.getTopicRegexPattern()); - subscriberList.forEach(consumer -> matches.add(consumer)); + subscriberList.forEach(consumer -> matchingSubscribers.add(consumer)); } else { logger.trace("No topic match for '{}' using regex {}", topic, subscriberList.getTopicRegexPattern()); } }); - } + try { - matches.forEach(subscriber -> subscriber.processMessage(topic, payload)); + matchingSubscribers.forEach(subscriber -> subscriber.processMessage(topicString, payload)); } catch (Exception e) { logger.error("MQTT message received. MqttMessageSubscriber#processMessage() implementation failure", e); } diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttActionAdapterCallback.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttActionAdapterCallback.java deleted file mode 100644 index ca923bedd..000000000 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttActionAdapterCallback.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright (c) 2010-2019 Contributors to the openHAB project - * - * See the NOTICE file(s) distributed with this work for additional - * information. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.smarthome.io.transport.mqtt.internal; - -import java.util.concurrent.CompletableFuture; - -import org.eclipse.paho.client.mqttv3.IMqttActionListener; -import org.eclipse.paho.client.mqttv3.IMqttToken; -import org.eclipse.smarthome.io.transport.mqtt.MqttActionCallback; - -/** - * Create a IMqttActionListener object for being used as a callback for a publish attempt. - * - * @author David Graeff - Initial contribution - */ -public class MqttActionAdapterCallback implements IMqttActionListener { - @Override - public void onSuccess(IMqttToken token) { - if (token.getUserContext() instanceof MqttActionCallback) { - MqttActionCallback subscriber = (MqttActionCallback) token.getUserContext(); - subscriber.onSuccess(token.getTopics()[0]); - } else if (token.getUserContext() instanceof CompletableFuture) { - @SuppressWarnings("unchecked") - CompletableFuture future = (CompletableFuture) token.getUserContext(); - future.complete(true); - } - } - - @Override - public void onFailure(IMqttToken token, Throwable throwable) { - if (token.getUserContext() instanceof MqttActionCallback) { - MqttActionCallback subscriber = (MqttActionCallback) token.getUserContext(); - subscriber.onFailure(token.getTopics()[0], throwable); - } else if (token.getUserContext() instanceof CompletableFuture) { - @SuppressWarnings("unchecked") - CompletableFuture future = (CompletableFuture) token.getUserContext(); - future.completeExceptionally(throwable); - } - } -} diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnectionServiceInstance.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnectionServiceInstance.java index a6fdecf2a..05c6b33e5 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnectionServiceInstance.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnectionServiceInstance.java @@ -15,7 +15,6 @@ package org.eclipse.smarthome.io.transport.mqtt.internal; import java.util.Map; import org.apache.commons.lang.StringUtils; -import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.eclipse.smarthome.config.core.Configuration; @@ -68,10 +67,11 @@ public class MqttBrokerConnectionServiceInstance { connection.stop(); } - if (configMap == null || configMap.isEmpty() || mqttService == null) { + final MqttServiceImpl service = (MqttServiceImpl) mqttService; + + if (configMap == null || configMap.isEmpty() || service == null) { return; } - final @NonNull MqttServiceImpl service = (@NonNull MqttServiceImpl) mqttService; // Parse configuration MqttBrokerConnectionConfig config = new Configuration(configMap).as(MqttBrokerConnectionConfig.class); @@ -79,7 +79,7 @@ public class MqttBrokerConnectionServiceInstance { try { // Compute brokerID and make sure it is not empty String brokerID = config.getBrokerID(); - if (StringUtils.isBlank(brokerID) || brokerID == null) { + if (StringUtils.isBlank(brokerID)) { logger.warn("Ignore invalid broker connection configuration: {}", config); return; } diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt3AsyncClientWrapper.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt3AsyncClientWrapper.java new file mode 100644 index 000000000..c18677cf8 --- /dev/null +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt3AsyncClientWrapper.java @@ -0,0 +1,108 @@ +/** + * Copyright (c) 2010-2019 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.smarthome.io.transport.mqtt.internal.client; + +import java.util.concurrent.CompletableFuture; + +import javax.net.ssl.TrustManagerFactory; + +import org.apache.commons.lang.StringUtils; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.ConnectionCallback; +import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.Protocol; +import org.eclipse.smarthome.io.transport.mqtt.MqttWillAndTestament; +import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback; + +import com.hivemq.client.mqtt.MqttClientState; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; +import com.hivemq.client.mqtt.mqtt3.Mqtt3Client; +import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder; +import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3Connect; +import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscribe; +import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe; + +/** + * The {@link Mqtt3AsyncClientWrapper} provides the wrapper for Mqttv3 async clients + * + * @author Jan N. Klug - Initial contribution + */ +@NonNullByDefault +public class Mqtt3AsyncClientWrapper extends MqttAsyncClientWrapper { + private final Mqtt3AsyncClient client; + + public Mqtt3AsyncClientWrapper(String host, int port, String clientId, Protocol protocol, boolean secure, + ConnectionCallback connectionCallback, @Nullable TrustManagerFactory trustManagerFactory) { + Mqtt3ClientBuilder clientBuilder = Mqtt3Client.builder().serverHost(host).serverPort(port).identifier(clientId) + .addConnectedListener(connectionCallback).addDisconnectedListener(connectionCallback); + + if (protocol == Protocol.WEBSOCKETS) { + clientBuilder.webSocketWithDefaultConfig(); + } + if (secure) { + clientBuilder.sslWithDefaultConfig().sslConfig().trustManagerFactory(trustManagerFactory).applySslConfig(); + } + + client = clientBuilder.buildAsync(); + }; + + @Override + public MqttClientState getState() { + return client.getState(); + } + + @Override + public CompletableFuture subscribe(String topic, int qos, ClientCallback clientCallback) { + Mqtt3Subscribe subscribeMessage = Mqtt3Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos)) + .build(); + return client.subscribe(subscribeMessage, clientCallback::messageArrived); + } + + @Override + public CompletableFuture unsubscribe(String topic) { + Mqtt3Unsubscribe unsubscribeMessage = Mqtt3Unsubscribe.builder().topicFilter(topic).build(); + return client.unsubscribe(unsubscribeMessage); + } + + @Override + public CompletableFuture publish(String topic, byte[] payload, boolean retain, int qos) { + Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic(topic).qos(getMqttQosFromInt(qos)).payload(payload) + .retain(retain).build(); + return client.publish(publishMessage); + } + + @Override + public CompletableFuture connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval, + @Nullable String username, @Nullable String password) { + Mqtt3ConnectBuilder connectMessageBuilder = Mqtt3Connect.builder().keepAlive(keepAliveInterval); + if (lwt != null) { + Mqtt3Publish willPublish = Mqtt3Publish.builder().topic(lwt.getTopic()).payload(lwt.getPayload()) + .retain(lwt.isRetain()).qos(getMqttQosFromInt(lwt.getQos())).build(); + connectMessageBuilder.willPublish(willPublish); + } + + if (!StringUtils.isBlank(username) && !StringUtils.isBlank(password) && password != null) { + connectMessageBuilder.simpleAuth().username(username).password(password.getBytes()).applySimpleAuth(); + } + + return client.connect(connectMessageBuilder.build()); + } + + @Override + public CompletableFuture disconnect() { + return client.disconnect(); + } + +} diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt5AsyncClientWrapper.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt5AsyncClientWrapper.java new file mode 100644 index 000000000..9f01d5986 --- /dev/null +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt5AsyncClientWrapper.java @@ -0,0 +1,109 @@ +/** + * Copyright (c) 2010-2019 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.smarthome.io.transport.mqtt.internal.client; + +import java.util.concurrent.CompletableFuture; + +import javax.net.ssl.TrustManagerFactory; + +import org.apache.commons.lang.StringUtils; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.ConnectionCallback; +import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.Protocol; +import org.eclipse.smarthome.io.transport.mqtt.MqttWillAndTestament; +import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback; + +import com.hivemq.client.mqtt.MqttClientState; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe; +import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe; + +/** + * The {@link Mqtt5AsyncClientWrapper} provides the wrapper for Mqttv3 async clients + * + * @author Jan N. Klug - Initial contribution + */ +@NonNullByDefault +public class Mqtt5AsyncClientWrapper extends MqttAsyncClientWrapper { + private final Mqtt5AsyncClient client; + + public Mqtt5AsyncClientWrapper(String host, int port, String clientId, Protocol protocol, boolean secure, + ConnectionCallback connectionCallback, @Nullable TrustManagerFactory trustManagerFactory) { + Mqtt5ClientBuilder clientBuilder = Mqtt5Client.builder().serverHost(host).serverPort(port).identifier(clientId) + .addConnectedListener(connectionCallback).addDisconnectedListener(connectionCallback); + + if (protocol == Protocol.WEBSOCKETS) { + clientBuilder.webSocketWithDefaultConfig(); + } + if (secure) { + clientBuilder.sslWithDefaultConfig().sslConfig().trustManagerFactory(trustManagerFactory).applySslConfig(); + } + + client = clientBuilder.buildAsync(); + }; + + @Override + public MqttClientState getState() { + return client.getState(); + } + + @Override + public CompletableFuture subscribe(String topic, int qos, ClientCallback clientCallback) { + Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos)) + .build(); + return client.subscribe(subscribeMessage, clientCallback::messageArrived); + } + + @Override + public CompletableFuture unsubscribe(String topic) { + Mqtt5Unsubscribe unsubscribeMessage = Mqtt5Unsubscribe.builder().topicFilter(topic).build(); + return client.unsubscribe(unsubscribeMessage); + } + + @Override + public CompletableFuture publish(String topic, byte[] payload, boolean retain, int qos) { + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic).qos(getMqttQosFromInt(qos)).payload(payload) + .retain(retain).build(); + return client.publish(publishMessage); + } + + @Override + public CompletableFuture connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval, + @Nullable String username, @Nullable String password) { + Mqtt5ConnectBuilder connectMessageBuilder = Mqtt5Connect.builder().keepAlive(keepAliveInterval); + if (lwt != null) { + Mqtt5Publish willPublish = Mqtt5Publish.builder().topic(lwt.getTopic()).payload(lwt.getPayload()) + .retain(lwt.isRetain()).qos(getMqttQosFromInt(lwt.getQos())).build(); + connectMessageBuilder.willPublish(willPublish); + } + + if (!StringUtils.isBlank(username) && !StringUtils.isBlank(password) && password != null) { + connectMessageBuilder.simpleAuth().username(username).password(password.getBytes()).applySimpleAuth(); + } + + return client.connect(connectMessageBuilder.build()); + } + + @Override + public CompletableFuture disconnect() { + return client.disconnect(); + } + +} diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java new file mode 100644 index 000000000..b754837a3 --- /dev/null +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java @@ -0,0 +1,58 @@ +/** + * Copyright (c) 2010-2019 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.smarthome.io.transport.mqtt.internal.client; + +import java.util.concurrent.CompletableFuture; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.eclipse.smarthome.io.transport.mqtt.MqttWillAndTestament; +import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback; + +import com.hivemq.client.mqtt.MqttClientState; +import com.hivemq.client.mqtt.datatypes.MqttQos; + +/** + * The {@link AbstractMqttAsyncClient} is the base class for async client wrappers + * + * @author Jan N. Klug - Initial contribution + */ + +@NonNullByDefault +public abstract class MqttAsyncClientWrapper { + public abstract CompletableFuture connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval, + @Nullable String username, @Nullable String password); + + public abstract CompletableFuture disconnect(); + + public abstract MqttClientState getState(); + + public abstract CompletableFuture subscribe(String topic, int qos, ClientCallback clientCallback); + + public abstract CompletableFuture unsubscribe(String topic); + + public abstract CompletableFuture publish(String topic, byte[] payload, boolean retain, int qos); + + protected MqttQos getMqttQosFromInt(int qos) { + switch (qos) { + case 0: + return MqttQos.AT_LEAST_ONCE; + case 1: + return MqttQos.AT_MOST_ONCE; + case 2: + return MqttQos.EXACTLY_ONCE; + default: + throw new IllegalArgumentException("QoS needs to be 0, 1 or 2."); + } + } +} diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/ssl/CustomTrustManagerFactory.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/ssl/CustomTrustManagerFactory.java new file mode 100644 index 000000000..27d732f90 --- /dev/null +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/ssl/CustomTrustManagerFactory.java @@ -0,0 +1,85 @@ +/** + * Copyright (c) 2010-2019 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.smarthome.io.transport.mqtt.ssl; + +import java.lang.reflect.Field; +import java.security.KeyStore; + +import javax.net.ssl.ManagerFactoryParameters; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.eclipse.smarthome.io.transport.mqtt.sslcontext.SSLContextProvider; +import org.osgi.service.cm.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.ssl.util.SimpleTrustManagerFactory; + +/** + * The {@link CustomTrustManagerFactory} is a TrustManagerFactory that provides a custom {@link TrustManager} + * + * @author Jan N. Klug - Initial contribution + */ +@NonNullByDefault +public class CustomTrustManagerFactory extends SimpleTrustManagerFactory { + private final Logger logger = LoggerFactory.getLogger(CustomTrustManagerFactory.class); + private final TrustManager[] trustManagers; + + public CustomTrustManagerFactory(TrustManager[] trustManagers) { + this.trustManagers = trustManagers; + } + + @Deprecated + public CustomTrustManagerFactory(SSLContextProvider contextProvider) { + TrustManager[] tm; + try { + SSLContext ctx = contextProvider.getContext(); + + // get SSLContextImpl + Field contextSpiField = ctx.getClass().getDeclaredField("contextSpi"); + contextSpiField.setAccessible(true); + Object sslContextImpl = contextSpiField.get(ctx); + Class sslContextImplClass = sslContextImpl.getClass().getSuperclass().getSuperclass(); + + // get trustmanager + Field trustManagerField = sslContextImplClass.getDeclaredField("trustManager"); + trustManagerField.setAccessible(true); + Object trustManagerObj = trustManagerField.get(sslContextImpl); + + tm = new TrustManager[] { (X509TrustManager) trustManagerObj }; + } catch (IllegalAccessException | NoSuchFieldException | ConfigurationException e) { + logger.warn("using default insecure trustmanager, could not extract trustmanager from SSL context:", e); + tm = InsecureTrustManagerFactory.INSTANCE.getTrustManagers(); + } + trustManagers = tm; + } + + @Override + protected void engineInit(@Nullable KeyStore keyStore) throws Exception { + } + + @Override + protected void engineInit(@Nullable ManagerFactoryParameters managerFactoryParameters) throws Exception { + } + + @Override + protected TrustManager[] engineGetTrustManagers() { + return trustManagers; + } + +} diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/AcceptAllCertificatesSSLContext.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/AcceptAllCertificatesSSLContext.java index 819eda6d2..196edd911 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/AcceptAllCertificatesSSLContext.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/AcceptAllCertificatesSSLContext.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; * * @author David Graeff - Initial contribution */ +@Deprecated public class AcceptAllCertificatesSSLContext implements SSLContextProvider { private final Logger logger = LoggerFactory.getLogger(AcceptAllCertificatesSSLContext.class); diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/CustomSSLContextProvider.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/CustomSSLContextProvider.java new file mode 100644 index 000000000..29b7973c2 --- /dev/null +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/CustomSSLContextProvider.java @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2010-2019 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.smarthome.io.transport.mqtt.sslcontext; + +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +import org.osgi.service.cm.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This SSLContextProvider returns an {@link SSLContext} that accepts all connections and doesn't perform any + * certificate validations. This implementation forces a TLS v1.2 {@link SSLContext} instance. + * + * @author Jan N. Klug - Initial contribution + */ +@Deprecated +public class CustomSSLContextProvider implements SSLContextProvider { + private final Logger logger = LoggerFactory.getLogger(CustomSSLContextProvider.class); + private final TrustManagerFactory factory; + + public CustomSSLContextProvider(TrustManagerFactory factory) { + this.factory = factory; + } + + @Override + public SSLContext getContext() throws ConfigurationException { + try { + if (factory == null) { + return SSLContext.getDefault(); + } else { + SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); + sslContext.init(null, factory.getTrustManagers(), null); + return sslContext; + } + } catch (KeyManagementException | NoSuchAlgorithmException e) { + logger.warn("SSL configuration failed", e); + throw new ConfigurationException("ssl", e.getMessage()); + } + } + +} diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/SSLContextProvider.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/SSLContextProvider.java index 841775aa7..1f158bd7f 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/SSLContextProvider.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/SSLContextProvider.java @@ -24,6 +24,7 @@ import org.osgi.service.cm.ConfigurationException; * * @author David Graeff - Initial contribution */ +@Deprecated public interface SSLContextProvider { /** * Return an {@link SSLContext} to be used by secure Mqtt broker connections. Never return null here. If you are not diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttAsyncClientEx.java b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttAsyncClientEx.java deleted file mode 100644 index 0e7d964c8..000000000 --- a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttAsyncClientEx.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Copyright (c) 2010-2019 Contributors to the openHAB project - * - * See the NOTICE file(s) distributed with this work for additional - * information. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.smarthome.io.transport.mqtt; - -import static org.mockito.Mockito.*; - -import java.util.Collections; - -import org.eclipse.paho.client.mqttv3.IMqttActionListener; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.IMqttToken; -import org.eclipse.paho.client.mqttv3.MqttAsyncClient; -import org.eclipse.paho.client.mqttv3.MqttClientPersistence; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttPersistenceException; -import org.eclipse.paho.client.mqttv3.MqttSecurityException; - -/** - * We need an extended MqttAsyncClientEx to overwrite the connection state. - * - * In respect to the success flags the operations publish, subscribe, unsubscribe, connect, - * disconnect immediately succeed or fail. - * - * @author David Graeff - Initial contribution - */ -public class MqttAsyncClientEx extends MqttAsyncClient { - public MqttBrokerConnectionEx connection; - - public MqttAsyncClientEx(String serverURI, String clientId, MqttClientPersistence dataStore, - MqttBrokerConnectionEx connection) throws MqttException { - super(serverURI, clientId, dataStore); - this.connection = connection; - } - - IMqttToken getToken(Object userContext, IMqttActionListener callback, String topic) { - IMqttToken t = mock(IMqttToken.class); - doReturn(userContext).when(t).getUserContext(); - doReturn(true).when(t).isComplete(); - doReturn(Collections.singletonList(topic).toArray(new String[1])).when(t).getTopics(); - doReturn(MqttAsyncClientEx.this).when(t).getClient(); - doReturn(callback).when(t).getActionCallback(); - doReturn(null).when(t).getException(); - return t; - } - - IMqttDeliveryToken getDeliveryToken(Object userContext, IMqttActionListener callback, String topic) { - IMqttDeliveryToken t = mock(IMqttDeliveryToken.class); - doReturn(userContext).when(t).getUserContext(); - doReturn(true).when(t).isComplete(); - doReturn(Collections.singletonList(topic).toArray(new String[1])).when(t).getTopics(); - doReturn(MqttAsyncClientEx.this).when(t).getClient(); - doReturn(callback).when(t).getActionCallback(); - doReturn(null).when(t).getException(); - return t; - } - - @Override - public boolean isConnected() { - return connection.connectionStateOverwrite == MqttConnectionState.CONNECTED; - } - - @Override - public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained, Object userContext, - IMqttActionListener callback) throws MqttException, MqttPersistenceException { - if (connection.publishSuccess) { - callback.onSuccess(getToken(userContext, callback, topic)); - } else { - callback.onFailure(getToken(userContext, callback, topic), new MqttException(0)); - } - return getDeliveryToken(userContext, callback, topic); - } - - @Override - public IMqttToken subscribe(String topic, int qos, Object userContext, IMqttActionListener callback) - throws MqttException { - if (connection.publishSuccess) { - callback.onSuccess(getToken(userContext, callback, topic)); - } else { - callback.onFailure(getToken(userContext, callback, topic), new MqttException(0)); - } - return getToken(userContext, callback, topic); - } - - @Override - public IMqttToken unsubscribe(String topic, Object userContext, IMqttActionListener callback) throws MqttException { - if (connection.unsubscribeSuccess) { - callback.onSuccess(getToken(userContext, callback, topic)); - } else { - callback.onFailure(getToken(userContext, callback, topic), new MqttException(0)); - } - return getToken(userContext, callback, topic); - } - - @Override - public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActionListener callback) - throws MqttException { - connection.connectionStateOverwrite = MqttConnectionState.DISCONNECTED; - if (callback != null) { - if (connection.disconnectSuccess) { - callback.onSuccess(getToken(userContext, callback, null)); - } else { - callback.onFailure(getToken(userContext, callback, null), new MqttException(0)); - } - } - return getToken(userContext, callback, null); - } - - @Override - public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) - throws MqttException, MqttSecurityException { - if (!connection.connectTimeout) { - connection.connectionStateOverwrite = MqttConnectionState.CONNECTED; - if (connection.connectSuccess) { - callback.onSuccess(getToken(userContext, callback, null)); - } else { - callback.onFailure(getToken(userContext, callback, null), new MqttException(0)); - } - } else { - connection.connectionStateOverwrite = MqttConnectionState.DISCONNECTED; - } - return getToken(userContext, callback, null); - } -} diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionEx.java b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionEx.java index aef4127f7..271bf2079 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionEx.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionEx.java @@ -12,22 +12,27 @@ */ package org.eclipse.smarthome.io.transport.mqtt; -import static org.mockito.Mockito.spy; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.CompletableFuture; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; -import org.eclipse.paho.client.mqttv3.MqttAsyncClient; -import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.smarthome.io.transport.mqtt.internal.client.MqttAsyncClientWrapper; + +import com.hivemq.client.mqtt.MqttClientState; /** * We need an extended MqttBrokerConnection to overwrite the protected `connectionCallbacks` with * an instance that takes the mocked version of `MqttBrokerConnection` and overwrite the connection state. * - * We also replace the internal MqttAsyncClient with a spied one, that in respect to the success flags + * We also mock the internal Mqtt3AsyncClient that in respect to the success flags * immediately succeed or fail with publish, subscribe, unsubscribe, connect, disconnect. * * @author David Graeff - Initial contribution + * @author Jan N. Klug - adjusted to HiveMQ client */ @NonNullByDefault public class MqttBrokerConnectionEx extends MqttBrokerConnection { @@ -48,9 +53,50 @@ public class MqttBrokerConnectionEx extends MqttBrokerConnection { } @Override - protected MqttAsyncClient createClient(String serverURI, String clientId, MqttClientPersistence dataStore) - throws org.eclipse.paho.client.mqttv3.MqttException { - return spy(new MqttAsyncClientEx(serverURI, clientId, dataStore, this)); + protected MqttAsyncClientWrapper createClient() { + MqttAsyncClientWrapper mockedClient = mock(MqttAsyncClientWrapper.class); + // connect + doAnswer(i -> { + if (!connectTimeout) { + connectionCallback.onConnected(null); + connectionStateOverwrite = MqttConnectionState.CONNECTED; + return CompletableFuture.completedFuture(null); + } + return new CompletableFuture(); + }).when(mockedClient).connect(any(), anyInt(), any(), any()); + doAnswer(i -> { + if (disconnectSuccess) { + connectionCallback.onDisconnected(new Throwable("disconnect")); + connectionStateOverwrite = MqttConnectionState.DISCONNECTED; + return CompletableFuture.completedFuture(null); + } + return new CompletableFuture(); + }).when(mockedClient).disconnect(); + // subscribe + doAnswer(i -> { + if (subscribeSuccess) { + return CompletableFuture.completedFuture(null); + } else { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new Throwable("subscription failed")); + return future; + } + }).when(mockedClient).subscribe(any(), anyInt(), any()); + // unsubscribe + doAnswer(i -> { + if (unsubscribeSuccess) { + return CompletableFuture.completedFuture(null); + } else { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new Throwable("unsubscription failed")); + return future; + } + }).when(mockedClient).unsubscribe(any()); + // state + doAnswer(i -> { + return MqttClientState.CONNECTED; + }).when(mockedClient).getState(); + return mockedClient; } @Override diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTests.java b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTests.java index 9531eebfe..213fe5abe 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTests.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTests.java @@ -28,22 +28,25 @@ import java.util.concurrent.TimeoutException; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; -import org.eclipse.paho.client.mqttv3.IMqttToken; -import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.smarthome.io.transport.mqtt.internal.client.MqttAsyncClientWrapper; import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrategy; import org.eclipse.smarthome.io.transport.mqtt.reconnect.PeriodicReconnectStrategy; import org.junit.Test; import org.osgi.service.cm.ConfigurationException; +import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; + /** * Tests the MqttBrokerConnection class * * @author David Graeff - Initial contribution + * @author Jan N. Klug - adjusted to HiveMQ client */ public class MqttBrokerConnectionTests { @Test - public void subscribeBeforeOnlineThenConnect() throws ConfigurationException, MqttException, InterruptedException, - ExecutionException, TimeoutException, org.eclipse.paho.client.mqttv3.MqttException { + public void subscribeBeforeOnlineThenConnect() + throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException { MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false, "MqttBrokerConnectionTests"); @@ -55,14 +58,16 @@ public class MqttBrokerConnectionTests { assertTrue(connection.hasSubscribers()); assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED)); + Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("homie/device123/$name").payload("hello".getBytes()) + .build(); // Test if subscription is active - connection.clientCallback.messageArrived("homie/device123/$name", new MqttMessage("hello".getBytes())); + connection.clientCallback.messageArrived(publishMessage); verify(subscriber).processMessage(eq("homie/device123/$name"), eq("hello".getBytes())); } @Test - public void subscribeToWildcardTopic() throws ConfigurationException, MqttException, InterruptedException, - ExecutionException, TimeoutException, org.eclipse.paho.client.mqttv3.MqttException { + public void subscribeToWildcardTopic() + throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException { MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false, "MqttBrokerConnectionTests"); @@ -80,7 +85,9 @@ public class MqttBrokerConnectionTests { assertTrue(connection.hasSubscribers()); assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED)); - connection.clientCallback.messageArrived("homie/device123/$name", new MqttMessage("hello".getBytes())); + Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("homie/device123/$name").payload("hello".getBytes()) + .build(); + connection.clientCallback.messageArrived(publishMessage); verify(subscriber).processMessage(eq("homie/device123/$name"), eq("hello".getBytes())); verify(subscriber2).processMessage(eq("homie/device123/$name"), eq("hello".getBytes())); @@ -88,8 +95,8 @@ public class MqttBrokerConnectionTests { } @Test - public void subscriber() throws ConfigurationException, MqttException, InterruptedException, ExecutionException, - TimeoutException, org.eclipse.paho.client.mqttv3.MqttException { + public void subscriber() + throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException { MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false, "MqttBrokerConnectionTests"); @@ -116,7 +123,7 @@ public class MqttBrokerConnectionTests { // Add subscriber (while connected) CompletableFuture future = connection.subscribe("topic", subscriber); - verify(connection.client).subscribe(any(), anyInt(), any(), any()); + verify(connection.client).subscribe(any(), anyInt(), any()); assertTrue(future.get(200, TimeUnit.MILLISECONDS)); // Remove subscriber (while connected) @@ -155,11 +162,9 @@ public class MqttBrokerConnectionTests { // Fake a disconnect connection.setReconnectStrategy(mockPolicy); doReturn(MqttConnectionState.DISCONNECTED).when(connection).connectionState(); - IMqttToken token = mock(IMqttToken.class); - when(token.getException()).thenReturn(new org.eclipse.paho.client.mqttv3.MqttException(1)); connection.isConnecting = true; /* Pretend that start did something */ - connection.connectionCallback.onFailure(token, null); + connection.connectionCallback.onDisconnected(new Throwable("disconnected")); // Check lostConnect verify(mockPolicy).lostConnection(); @@ -168,7 +173,7 @@ public class MqttBrokerConnectionTests { assertTrue(mockPolicy.isReconnecting()); // Fake connection established - connection.connectionCallback.onSuccess(token); + connection.connectionCallback.onConnected(null); assertFalse(mockPolicy.isReconnecting()); } @@ -221,8 +226,8 @@ public class MqttBrokerConnectionTests { CompletableFuture future = connection.start(); verify(connection.connectionCallback).createFuture(); - verify(connection.connectionCallback, times(0)).onSuccess(any()); - verify(connection.connectionCallback, times(0)).onFailure(any(), any()); + verify(connection.connectionCallback, times(0)).onConnected(any()); + verify(connection.connectionCallback, times(0)).onDisconnected(any(MqttClientDisconnectedContext.class)); assertNotNull(connection.timeoutFuture); assertThat(future.get(70, TimeUnit.MILLISECONDS), is(false)); @@ -245,17 +250,13 @@ public class MqttBrokerConnectionTests { // Cause a success callback connection.connectionStateOverwrite = MqttConnectionState.CONNECTED; - connection.connectionCallback.onSuccess(null); + connection.connectionCallback.onConnected(null); verify(connectionObserver, times(1)).connectionStateChanged(eq(MqttConnectionState.CONNECTED), isNull()); - // Cause a failure callback with a mocked token - IMqttToken token = mock(IMqttToken.class); - org.eclipse.paho.client.mqttv3.MqttException testException = new org.eclipse.paho.client.mqttv3.MqttException( - 1); - when(token.getException()).thenReturn(testException); + Exception testException = new Exception("test message"); connection.connectionStateOverwrite = MqttConnectionState.DISCONNECTED; - connection.connectionCallback.onFailure(token, null); + connection.connectionCallback.onDisconnected(testException); verify(connectionObserver, times(1)).connectionStateChanged(eq(MqttConnectionState.DISCONNECTED), eq(testException)); @@ -318,7 +319,6 @@ public class MqttBrokerConnectionTests { assertEquals(1, connection.getQos()); // Check for default ssl context provider and reconnect policy - assertNotNull(connection.getSSLContextProvider()); assertNotNull(connection.getReconnectStrategy()); assertThat(connection.connectionState(), equalTo(MqttConnectionState.DISCONNECTED)); @@ -326,8 +326,8 @@ public class MqttBrokerConnectionTests { @SuppressWarnings("null") @Test - public void gracefulStop() throws ConfigurationException, MqttException, InterruptedException, ExecutionException, - TimeoutException, org.eclipse.paho.client.mqttv3.MqttException { + public void gracefulStop() + throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException { MqttBrokerConnectionEx connection = spy( new MqttBrokerConnectionEx("123.123.123.123", null, false, "MqttBrokerConnectionTests")); @@ -340,7 +340,7 @@ public class MqttBrokerConnectionTests { assertThat(connection.hasSubscribers(), is(true)); // Let's observe the internal connection client - MqttAsyncClientEx client = (MqttAsyncClientEx) connection.client; + MqttAsyncClientWrapper client = connection.client; // Stop CompletableFuture future = connection.stop(); @@ -353,7 +353,7 @@ public class MqttBrokerConnectionTests { future.get(1000, TimeUnit.MILLISECONDS); verify(connection).unsubscribeAll(); - verify(client).disconnect(anyLong(), any(), any()); + verify(client).disconnect(); // Subscribers should be removed assertThat(connection.hasSubscribers(), is(false)); diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttServiceTests.java b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttServiceTests.java index 8fd3274f9..baa7ffc3d 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttServiceTests.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttServiceTests.java @@ -40,7 +40,7 @@ public class MqttServiceTests { service.addBrokersListener(observer); assertTrue(service.hasBrokerObservers()); - MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("tcp://123.123.123.123", null, false, + MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false, "brokerConnectionListenerTests"); assertTrue(service.addBrokerConnection("name", connection)); diff --git a/features/karaf/openhab-core/src/main/feature/feature.xml b/features/karaf/openhab-core/src/main/feature/feature.xml index dd2eaa4f5..9f746051f 100644 --- a/features/karaf/openhab-core/src/main/feature/feature.xml +++ b/features/karaf/openhab-core/src/main/feature/feature.xml @@ -215,8 +215,8 @@ openhab-core-base - openhab.tp;filter:="(feature=paho)" - openhab.tp-paho + openhab.tp;filter:="(feature=hivemqclient)" + openhab.tp-hivemqclient mvn:org.openhab.core.bundles/org.openhab.core.io.transport.mqtt/${project.version} diff --git a/features/karaf/openhab-tp/src/main/feature/feature.xml b/features/karaf/openhab-tp/src/main/feature/feature.xml index 303110ef5..b8a652bdf 100644 --- a/features/karaf/openhab-tp/src/main/feature/feature.xml +++ b/features/karaf/openhab-tp/src/main/feature/feature.xml @@ -57,6 +57,18 @@ mvn:org.eclipse.orbit.bundles/com.google.gson/2.8.2.v20180104-1110 + + openhab.tp;feature=hivemqclient;version=1.1.1 + wrap + openhab.tp-netty + mvn:org.jctools/jctools-core/2.1.2 + mvn:io.reactivex.rxjava2/rxjava/2.2.5 + mvn:org.reactivestreams/reactive-streams/1.0.2 + wrap:mvn:com.google.dagger/dagger/2.20 + mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.javax-inject/1_2 + mvn:com.hivemq/hivemq-mqtt-client/1.1.2 + + openhab.tp;feature=httpclient;version=${jetty.version} mvn:javax.servlet/javax.servlet-api/3.1.0 @@ -190,12 +202,6 @@ mvn:org.mapdb/mapdb/1.0.9 - - openhab.tp;feature=paho;version=1.2.1 - - mvn:org.eclipse.paho/org.eclipse.paho.client.mqttv3/1.2.1 - - openhab.tp;feature=serial;impl=javacomm mvn:org.eclipse.kura/org.eclipse.soda.dk.comm/1.2.201