diff --git a/bom/compile/pom.xml b/bom/compile/pom.xml
index be5020c83..2ad3a2caf 100644
--- a/bom/compile/pom.xml
+++ b/bom/compile/pom.xml
@@ -210,11 +210,11 @@
compile
-
+
- org.eclipse.paho
- org.eclipse.paho.client.mqttv3
- 1.2.1
+ com.hivemq
+ hivemq-mqtt-client
+ 1.1.2
compile
diff --git a/bom/runtime/pom.xml b/bom/runtime/pom.xml
index d3259f1eb..1d11be372 100644
--- a/bom/runtime/pom.xml
+++ b/bom/runtime/pom.xml
@@ -450,11 +450,11 @@
compile
-
+
- org.eclipse.paho
- org.eclipse.paho.client.mqttv3
- 1.2.1
+ com.hivemq
+ hivemq-mqtt-client
+ 1.1.2
compile
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java
index de9f10de5..946c3a415 100644
--- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java
@@ -12,14 +12,12 @@
*/
package org.eclipse.smarthome.io.transport.mqtt;
-import java.io.IOException;
-import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -29,29 +27,32 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.lang.StringUtils;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
-import org.eclipse.paho.client.mqttv3.IMqttActionListener;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.IMqttToken;
-import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
-import org.eclipse.smarthome.config.core.ConfigConstants;
import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback;
-import org.eclipse.smarthome.io.transport.mqtt.internal.MqttActionAdapterCallback;
import org.eclipse.smarthome.io.transport.mqtt.internal.TopicSubscribers;
+import org.eclipse.smarthome.io.transport.mqtt.internal.client.Mqtt3AsyncClientWrapper;
+import org.eclipse.smarthome.io.transport.mqtt.internal.client.Mqtt5AsyncClientWrapper;
+import org.eclipse.smarthome.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrategy;
import org.eclipse.smarthome.io.transport.mqtt.reconnect.PeriodicReconnectStrategy;
-import org.eclipse.smarthome.io.transport.mqtt.sslcontext.AcceptAllCertificatesSSLContext;
+import org.eclipse.smarthome.io.transport.mqtt.ssl.CustomTrustManagerFactory;
+import org.eclipse.smarthome.io.transport.mqtt.sslcontext.CustomSSLContextProvider;
import org.eclipse.smarthome.io.transport.mqtt.sslcontext.SSLContextProvider;
import org.osgi.service.cm.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext;
+import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener;
+import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
+import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener;
+
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+
/**
* An MQTTBrokerConnection represents a single client connection to a MQTT broker.
*
@@ -59,7 +60,8 @@ import org.slf4j.LoggerFactory;
*
* @author Davy Vanherbergen - Initial contribution
* @author David Graeff - All operations are async now. More flexible sslContextProvider and reconnectStrategy added.
- * @author Markus Rathgeb - Added connection state callback
+ * @author Markus Rathgeb - added connection state callback
+ * @author Jan N. Klug - changed from PAHO to HiveMQ client
*/
@NonNullByDefault
public class MqttBrokerConnection {
@@ -73,29 +75,39 @@ public class MqttBrokerConnection {
public enum Protocol {
TCP,
WEBSOCKETS
- };
+ }
- /// Connection parameters
+ /**
+ * MQTT version (currently v3 and v5)
+ */
+ public enum MqttVersion {
+ V3,
+ V5
+ }
+
+ // Connection parameters
protected final Protocol protocol;
protected final String host;
protected final int port;
protected final boolean secure;
+ protected final MqttVersion mqttVersion;
+
+ private @Nullable TrustManagerFactory trustManagerFactory = InsecureTrustManagerFactory.INSTANCE;
+ private SSLContextProvider sslContextProvider = new CustomSSLContextProvider(trustManagerFactory);
protected final String clientId;
private @Nullable String user;
private @Nullable String password;
/// Configuration variables
private int qos = DEFAULT_QOS;
+ @Deprecated
private boolean retain = false;
private @Nullable MqttWillAndTestament lastWill;
- private @Nullable Path persistencePath;
protected @Nullable AbstractReconnectStrategy reconnectStrategy;
- private SSLContextProvider sslContextProvider = new AcceptAllCertificatesSSLContext();
private int keepAliveInterval = DEFAULT_KEEPALIVE_INTERVAL;
/// Runtime variables
- protected @Nullable MqttAsyncClient client;
- protected @Nullable MqttClientPersistence dataStore;
+ protected @Nullable MqttAsyncClientWrapper client;
protected boolean isConnecting = false;
protected final List connectionObservers = new CopyOnWriteArrayList<>();
@@ -107,12 +119,12 @@ public class MqttBrokerConnection {
private int timeout = 1200; /* Connection timeout in milliseconds */
/**
- * Create a IMqttActionListener object for being used as a callback for a connection attempt.
+ * Create a listener object for being used as a callback for a connection attempt.
* The callback will interact with the {@link AbstractReconnectStrategy} as well as inform registered
* {@link MqttConnectionObserver}s.
*/
- @NonNullByDefault({})
- public class ConnectionCallback implements IMqttActionListener {
+ @NonNullByDefault
+ public class ConnectionCallback implements MqttClientConnectedListener, MqttClientDisconnectedListener {
private final MqttBrokerConnection connection;
private final Runnable cancelTimeoutFuture;
private CompletableFuture future = new CompletableFuture<>();
@@ -122,8 +134,7 @@ public class MqttBrokerConnection {
this.cancelTimeoutFuture = mqttBrokerConnectionImpl::cancelTimeoutFuture;
}
- @Override
- public void onSuccess(IMqttToken asyncActionToken) {
+ public void onConnected(@Nullable MqttClientConnectedContext context) {
cancelTimeoutFuture.run();
connection.isConnecting = false;
@@ -143,15 +154,21 @@ public class MqttBrokerConnection {
});
}
- @Override
- public void onFailure(@Nullable IMqttToken token, @Nullable Throwable error) {
- cancelTimeoutFuture.run();
+ public void onDisconnected(@Nullable MqttClientDisconnectedContext context) {
+ if (context != null) {
+ final Throwable throwable = context.getCause();
+ onDisconnected(throwable);
+ } else {
+ onDisconnected(new Throwable("unknown disconnect reason"));
+ }
+ }
- final Throwable throwable = (token != null && token.getException() != null) ? token.getException() : error;
+ public void onDisconnected(Throwable t) {
+ cancelTimeoutFuture.run();
final MqttConnectionState connectionState = connection.connectionState();
future.complete(false);
- connection.connectionObservers.forEach(o -> o.connectionStateChanged(connectionState, throwable));
+ connection.connectionObservers.forEach(o -> o.connectionStateChanged(connectionState, t));
// If we tried to connect via start(), use the reconnect strategy to try it again
if (connection.isConnecting) {
@@ -168,15 +185,13 @@ public class MqttBrokerConnection {
}
}
- /** Client callback object */
+ // Client callback object
protected ClientCallback clientCallback = new ClientCallback(this, connectionObservers, subscribers);
- /** Connection callback object */
+ // Connection callback object
protected ConnectionCallback connectionCallback;
- /** Action callback object */
- protected IMqttActionListener actionCallback = new MqttActionAdapterCallback();
/**
- * Create a new TCP MQTT client connection to a server with the given host and port.
+ * Create a new TCP MQTT3 client connection to a server with the given host and port.
*
* @param host A host name or address
* @param port A port or null to select the default port for a secure or insecure connection
@@ -188,11 +203,11 @@ public class MqttBrokerConnection {
* @throws IllegalArgumentException If the client id or port is not valid.
*/
public MqttBrokerConnection(String host, @Nullable Integer port, boolean secure, @Nullable String clientId) {
- this(Protocol.TCP, host, port, secure, clientId);
+ this(Protocol.TCP, MqttVersion.V3, host, port, secure, clientId);
}
/**
- * Create a new MQTT client connection to a server with the given protocol, host and port.
+ * Create a new MQTT3 client connection to a server with the given protocol, mqtt client version, host and port.
*
* @param protocol The transport protocol
* @param host A host name or address
@@ -204,14 +219,35 @@ public class MqttBrokerConnection {
* characters.
* @throws IllegalArgumentException If the client id or port is not valid.
*/
+ @Deprecated
public MqttBrokerConnection(Protocol protocol, String host, @Nullable Integer port, boolean secure,
@Nullable String clientId) {
+ this(protocol, MqttVersion.V3, host, port, secure, clientId);
+ }
+
+ /**
+ * Create a new MQTT client connection to a server with the given protocol, host and port.
+ *
+ * @param protocol The transport protocol
+ * @param mqttVersion The version of the MQTT client (v3 or v5)
+ * @param host A host name or address
+ * @param port A port or null to select the default port for a secure or insecure connection
+ * @param secure A secure connection
+ * @param clientId Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are
+ * used for access restriction implementations.
+ * If none is specified, a default is generated. The client id cannot be longer than 65535
+ * characters.
+ * @throws IllegalArgumentException If the client id or port is not valid.
+ */
+ public MqttBrokerConnection(Protocol protocol, MqttVersion mqttVersion, String host, @Nullable Integer port,
+ boolean secure, @Nullable String clientId) {
this.protocol = protocol;
this.host = host;
this.secure = secure;
+ this.mqttVersion = mqttVersion;
String newClientID = clientId;
if (newClientID == null) {
- newClientID = MqttClient.generateClientId();
+ newClientID = UUID.randomUUID().toString();
} else if (newClientID.length() > 65535) {
throw new IllegalArgumentException("Client ID cannot be longer than 65535 characters");
}
@@ -229,7 +265,7 @@ public class MqttBrokerConnection {
* state to the MQTT broker changed.
*
* The reconnect strategy will not be informed if the initial connection to the broker
- * timed out. You need a timeout executor additionally, see {@link #setTimeoutExecutor(Executor)}.
+ * timed out. You need a timeout executor additionally, see {@link #setTimeoutExecutor(ScheduledExecutorService, int)}.
*
* @param reconnectStrategy The reconnect strategy. May not be null.
*/
@@ -257,6 +293,23 @@ public class MqttBrokerConnection {
this.timeout = timeoutInMS;
}
+ public void setTrustManagers(TrustManager[] trustManagers) {
+ if (trustManagers.length != 0) {
+ trustManagerFactory = new CustomTrustManagerFactory(trustManagers);
+ } else {
+ trustManagerFactory = null;
+ }
+ sslContextProvider = new CustomSSLContextProvider(trustManagerFactory);
+ }
+
+ public TrustManager[] getTrustManagers() {
+ if (trustManagerFactory != null) {
+ return trustManagerFactory.getTrustManagers();
+ } else {
+ return new TrustManager[] {};
+ }
+ }
+
/**
* Get the MQTT broker protocol
*/
@@ -264,6 +317,13 @@ public class MqttBrokerConnection {
return protocol;
}
+ /**
+ * Get the MQTT version
+ */
+ public MqttVersion getMqttVersion() {
+ return mqttVersion;
+ }
+
/**
* Get the MQTT broker host
*/
@@ -327,25 +387,29 @@ public class MqttBrokerConnection {
* @param qos level.
*/
public void setQos(int qos) {
- if (qos >= 0 && qos <= 2) {
- this.qos = qos;
- } else {
- throw new IllegalArgumentException("The quality of service parameter must be >=0 and <=2.");
+ if (qos < 0 || qos > 3) {
+ throw new IllegalArgumentException();
}
+ this.qos = qos;
}
/**
+ * use retain flags on message publish instead
+ *
* @return true if newly messages sent to the broker should be retained by the broker.
*/
+ @Deprecated
public boolean isRetain() {
return retain;
}
/**
* Set whether newly published messages should be retained by the broker.
+ * use retain flags on message publish instead
*
* @param retain true to retain.
*/
+ @Deprecated
public void setRetain(boolean retain) {
this.retain = retain;
}
@@ -398,8 +462,8 @@ public class MqttBrokerConnection {
*
* @param persistencePath the path that should be used to store persistent data
*/
+ @Deprecated
public void setPersistencePath(final @Nullable Path persistencePath) {
- this.persistencePath = persistencePath;
}
/**
@@ -418,7 +482,7 @@ public class MqttBrokerConnection {
if (isConnecting) {
return MqttConnectionState.CONNECTING;
}
- return (client != null && client.isConnected()) ? MqttConnectionState.CONNECTED
+ return (client != null && client.getState().isConnected()) ? MqttConnectionState.CONNECTED
: MqttConnectionState.DISCONNECTED;
}
@@ -446,6 +510,7 @@ public class MqttBrokerConnection {
/**
* Return the ssl context provider.
*/
+ @Deprecated
public SSLContextProvider getSSLContextProvider() {
return sslContextProvider;
}
@@ -456,8 +521,10 @@ public class MqttBrokerConnection {
* @return The ssl context provider. Should not be null, but the ssl context will in fact
* only be used if a ssl:// url is given.
*/
+ @Deprecated
public void setSSLContextProvider(SSLContextProvider sslContextProvider) {
this.sslContextProvider = sslContextProvider;
+ trustManagerFactory = new CustomTrustManagerFactory(sslContextProvider);
}
/**
@@ -487,17 +554,19 @@ public class MqttBrokerConnection {
subscribers.put(topic, subscriberList);
subscriberList.add(subscriber);
}
- final MqttAsyncClient client = this.client;
+ final MqttAsyncClientWrapper client = this.client;
if (client == null) {
future.completeExceptionally(new Exception("No MQTT client"));
return future;
}
- if (client.isConnected()) {
- try {
- client.subscribe(topic, qos, future, actionCallback);
- } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
- future.completeExceptionally(e);
- }
+ if (client.getState().isConnected()) {
+ client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
+ if (t == null) {
+ future.complete(true);
+ } else {
+ future.completeExceptionally(new MqttException(t));
+ }
+ });
} else {
// The subscription will be performed on connecting.
future.complete(false);
@@ -514,16 +583,18 @@ public class MqttBrokerConnection {
protected CompletableFuture subscribeRaw(String topic) {
logger.trace("subscribeRaw message consumer for topic '{}' from broker '{}'", topic, host);
CompletableFuture future = new CompletableFuture<>();
- try {
- MqttAsyncClient client = this.client;
- if (client != null && client.isConnected()) {
- client.subscribe(topic, qos, future, actionCallback);
- } else {
- future.complete(false);
- }
- } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
- logger.info("Error subscribing to topic {}", topic, e);
- future.completeExceptionally(e);
+ final MqttAsyncClientWrapper mqttClient = this.client;
+ if (mqttClient != null && mqttClient.getState().isConnected()) {
+ mqttClient.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
+ if (t == null) {
+ future.complete(true);
+ } else {
+ logger.warn("Failed subscribing to topic {}", topic, t);
+ future.completeExceptionally(new MqttException(t));
+ }
+ });
+ } else {
+ future.complete(false);
}
return future;
}
@@ -550,9 +621,9 @@ public class MqttBrokerConnection {
// Remove from subscriber list
subscribers.remove(topic);
// No more subscribers to this topic. Unsubscribe topic on the broker
- MqttAsyncClient client = this.client;
- if (client != null) {
- return unsubscribeRaw(client, topic);
+ MqttAsyncClientWrapper mqttClient = this.client;
+ if (mqttClient != null) {
+ return unsubscribeRaw(mqttClient, topic);
} else {
return CompletableFuture.completedFuture(false);
}
@@ -567,18 +638,19 @@ public class MqttBrokerConnection {
* @return Completes with true if successful. Completes with false if no broker connection is established.
* Exceptionally otherwise.
*/
- protected CompletableFuture unsubscribeRaw(MqttAsyncClient client, String topic) {
+ protected CompletableFuture unsubscribeRaw(MqttAsyncClientWrapper client, String topic) {
logger.trace("Unsubscribing message consumer for topic '{}' from broker '{}'", topic, host);
CompletableFuture future = new CompletableFuture<>();
- try {
- if (client.isConnected()) {
- client.unsubscribe(topic, future, actionCallback);
- } else {
- future.complete(false);
- }
- } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
- logger.info("Error unsubscribing topic from broker", e);
- future.completeExceptionally(e);
+ if (client.getState().isConnected()) {
+ client.unsubscribe(topic).whenComplete((s, t) -> {
+ if (t == null) {
+ future.complete(true);
+ } else {
+ future.completeExceptionally(new MqttException(t));
+ }
+ });
+ } else {
+ return CompletableFuture.completedFuture(false);
}
return future;
}
@@ -608,33 +680,6 @@ public class MqttBrokerConnection {
return !connectionObservers.isEmpty();
}
- /**
- * Create a MqttConnectOptions object using the fields of this MqttBrokerConnection instance.
- * Package local, for testing.
- */
- MqttConnectOptions createMqttOptions() throws ConfigurationException {
- MqttConnectOptions options = new MqttConnectOptions();
-
- if (!StringUtils.isBlank(user)) {
- options.setUserName(user);
- }
- if (!StringUtils.isBlank(password) && password != null) {
- options.setPassword(password.toCharArray());
- }
- if (secure) {
- options.setSocketFactory(sslContextProvider.getContext().getSocketFactory());
- }
-
- if (lastWill != null) {
- MqttWillAndTestament lastWill = this.lastWill; // Make eclipse happy
- options.setWill(lastWill.getTopic(), lastWill.getPayload(), lastWill.getQos(), lastWill.isRetain());
- }
-
- options.setKeepAliveInterval(keepAliveInterval);
- options.setHttpsHostnameVerificationEnabled(false);
- return options;
- }
-
/**
* This will establish a connection to the MQTT broker and if successful, notify all
* publishers and subscribers that the connection has become active. This method will
@@ -661,76 +706,29 @@ public class MqttBrokerConnection {
}
// Close client if there is still one existing
- if (client != null) {
- try {
- client.close();
- } catch (org.eclipse.paho.client.mqttv3.MqttException ignore) {
- }
- client = null;
+
+ if (this.client != null) {
+ this.client.disconnect();
+ this.client = null;
}
CompletableFuture future = connectionCallback.createFuture();
- StringBuilder serverURI = new StringBuilder();
- switch (protocol) {
- case TCP:
- serverURI.append(secure ? "ssl://" : "tcp://");
- break;
- case WEBSOCKETS:
- serverURI.append(secure ? "wss://" : "ws://");
- break;
- default:
- future.completeExceptionally(new ConfigurationException("protocol", "Protocol unknown"));
- return future;
- }
- serverURI.append(host);
- serverURI.append(":");
- serverURI.append(port);
-
- // Storage
- Path persistencePath = this.persistencePath;
- if (persistencePath == null) {
- persistencePath = Paths.get(ConfigConstants.getUserDataFolder()).resolve("mqtt").resolve(host);
- }
- try {
- persistencePath = Files.createDirectories(persistencePath);
- } catch (IOException e) {
- future.completeExceptionally(new MqttException(e));
- return future;
- }
- MqttDefaultFilePersistence localDataStore = new MqttDefaultFilePersistence(persistencePath.toString());
-
// Create the client
- MqttAsyncClient localClient;
- try {
- localClient = createClient(serverURI.toString(), clientId, localDataStore);
- } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
- future.completeExceptionally(new MqttException(e));
- return future;
- }
+ MqttAsyncClientWrapper client = createClient();
+ this.client = client;
- // Assign to object
- this.client = localClient;
- this.dataStore = localDataStore;
+ // connect
+ client.connect(lastWill, keepAliveInterval, user, password);
- // Connect
- localClient.setCallback(clientCallback);
- try {
- MqttConnectOptions mqttConnectOptions = createMqttOptions();
- mqttConnectOptions.setMaxInflight(16384); // 1/4 of available message ids
- localClient.connect(mqttConnectOptions, null, connectionCallback);
- logger.info("Starting MQTT broker connection to '{}' with clientid {} and file store '{}'", host,
- getClientId(), persistencePath);
- } catch (org.eclipse.paho.client.mqttv3.MqttException | ConfigurationException e) {
- future.completeExceptionally(new MqttException(e));
- return future;
- }
+ logger.info("Starting MQTT broker connection to '{}' with clientid {}", host, getClientId());
// Connect timeout
ScheduledExecutorService executor = timeoutExecutor;
if (executor != null) {
final ScheduledFuture> timeoutFuture = this.timeoutFuture.getAndSet(executor.schedule(
- () -> connectionCallback.onFailure(null, new TimeoutException()), timeout, TimeUnit.MILLISECONDS));
+ () -> connectionCallback.onDisconnected(new TimeoutException("connect timed out")), timeout,
+ TimeUnit.MILLISECONDS));
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
@@ -738,18 +736,14 @@ public class MqttBrokerConnection {
return future;
}
- /**
- * Encapsulates the creation of the paho MqttAsyncClient
- *
- * @param serverURI A paho uri like ssl://host:port, tcp://host:port, ws[s]://host:port
- * @param clientId the mqtt client ID
- * @param dataStore The datastore to save qos!=0 messages until they are delivered.
- * @return Returns a valid MqttAsyncClient
- * @throws org.eclipse.paho.client.mqttv3.MqttException
- */
- protected MqttAsyncClient createClient(String serverURI, String clientId, MqttClientPersistence dataStore)
- throws org.eclipse.paho.client.mqttv3.MqttException {
- return new MqttAsyncClient(serverURI, clientId, dataStore);
+ protected MqttAsyncClientWrapper createClient() {
+ if (mqttVersion == MqttVersion.V3) {
+ return new Mqtt3AsyncClientWrapper(host, port, clientId, protocol, secure, connectionCallback,
+ trustManagerFactory);
+ } else {
+ return new Mqtt5AsyncClientWrapper(host, port, clientId, protocol, secure, connectionCallback,
+ trustManagerFactory);
+ }
}
/**
@@ -760,20 +754,11 @@ public class MqttBrokerConnection {
* @return Returns the value of the parameter v.
*/
protected boolean finalizeStopAfterDisconnect(boolean v) {
- if (client != null) {
- try {
- client.close();
- } catch (Exception ignore) {
- }
- }
- client = null;
- if (dataStore != null) {
- try {
- dataStore.close();
- } catch (Exception ignore) {
- }
- dataStore = null;
+ final MqttAsyncClientWrapper client = this.client;
+ if (client != null && connectionState() != MqttConnectionState.DISCONNECTED) {
+ client.disconnect();
}
+ this.client = null;
connectionObservers.forEach(o -> o.connectionStateChanged(MqttConnectionState.DISCONNECTED, null));
return v;
}
@@ -784,7 +769,7 @@ public class MqttBrokerConnection {
* @return Returns a future that completes as soon as all subscriptions have been canceled.
*/
public CompletableFuture unsubscribeAll() {
- MqttAsyncClient client = this.client;
+ MqttAsyncClientWrapper client = this.client;
List> futures = new ArrayList<>();
if (client != null) {
subscribers.forEach((topic, subList) -> {
@@ -804,7 +789,7 @@ public class MqttBrokerConnection {
* @return Returns a future that completes as soon as the disconnect process has finished.
*/
public CompletableFuture stop() {
- MqttAsyncClient client = this.client;
+ MqttAsyncClientWrapper client = this.client;
if (client == null) {
return CompletableFuture.completedFuture(true);
}
@@ -825,19 +810,15 @@ public class MqttBrokerConnection {
CompletableFuture future = new CompletableFuture<>();
// Close connection
- if (client.isConnected()) {
- // We need to thread change here. Because paho does not allow to disconnect within a callback method
- unsubscribeAll().thenRunAsync(() -> {
- try {
- client.disconnect(100).waitForCompletion(100);
- if (client.isConnected()) {
- client.disconnectForcibly();
+ if (client.getState().isConnected()) {
+ unsubscribeAll().thenRun(() -> {
+ client.disconnect().whenComplete((m, t) -> {
+ if (t == null) {
+ future.complete(true);
+ } else {
+ future.complete(false);
}
- future.complete(true);
- } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
- logger.debug("Error while closing connection to broker", e);
- future.complete(false);
- }
+ });
});
} else {
future.complete(true);
@@ -846,6 +827,18 @@ public class MqttBrokerConnection {
return future.thenApply(this::finalizeStopAfterDisconnect);
}
+ /**
+ * Publish a message to the broker.
+ *
+ * @param topic The topic
+ * @param payload The message payload
+ * @param listener A listener to be notified of success or failure of the delivery.
+ */
+ @Deprecated
+ public void publish(String topic, byte[] payload, MqttActionCallback listener) {
+ publish(topic, payload, getQos(), isRetain(), listener);
+ }
+
/**
* Publish a message to the broker with the given QoS and retained flag.
*
@@ -855,30 +848,22 @@ public class MqttBrokerConnection {
* @param retain Set to true to retain the message on the broker
* @param listener A listener to be notified of success or failure of the delivery.
*/
+ @Deprecated
public void publish(String topic, byte[] payload, int qos, boolean retain, MqttActionCallback listener) {
- MqttAsyncClient localClient = client;
- if (localClient == null) {
- listener.onFailure(topic, new MqttException(0));
+ final MqttAsyncClientWrapper client = this.client;
+ if (client == null) {
+ listener.onFailure(topic, new MqttException(new Throwable()));
return;
}
- try {
- IMqttDeliveryToken deliveryToken = localClient.publish(topic, payload, qos, retain, listener,
- actionCallback);
- logger.debug("Publishing message {} to topic '{}'", deliveryToken.getMessageId(), topic);
- } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
- listener.onFailure(topic, new MqttException(e));
- }
- }
- /**
- * Publish a message to the broker.
- *
- * @param topic The topic
- * @param payload The message payload
- * @param listener A listener to be notified of success or failure of the delivery.
- */
- public void publish(String topic, byte[] payload, MqttActionCallback listener) {
- publish(topic, payload, qos, retain, listener);
+ client.publish(topic, payload, retain, qos).whenComplete((m, t) -> {
+ if (t != null) {
+ listener.onFailure(topic, new MqttException(t));
+ } else {
+ listener.onSuccess(topic);
+ }
+ });
+ logger.debug("Publishing message to topic '{}'", topic);
}
/**
@@ -890,7 +875,7 @@ public class MqttBrokerConnection {
* exceptionally on an error or with a result of false if no broker connection is established.
*/
public CompletableFuture publish(String topic, byte[] payload) {
- return publish(topic, payload, qos, retain);
+ return publish(topic, payload, getQos(), isRetain());
}
/**
@@ -900,23 +885,26 @@ public class MqttBrokerConnection {
* @param payload The message payload
* @param qos The quality of service for this message
* @param retain Set to true to retain the message on the broker
- * @param listener An optional listener to be notified of success or failure of the delivery.
* @return Returns a future that completes with a result of true if the publishing succeeded and completes
* exceptionally on an error or with a result of false if no broker connection is established.
*/
public CompletableFuture publish(String topic, byte[] payload, int qos, boolean retain) {
- MqttAsyncClient client = this.client;
+ final MqttAsyncClientWrapper client = this.client;
if (client == null) {
return CompletableFuture.completedFuture(false);
}
+
// publish message asynchronously
- CompletableFuture f = new CompletableFuture<>();
- try {
- client.publish(topic, payload, qos, retain, f, actionCallback);
- } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
- f.completeExceptionally(new MqttException(e));
- }
- return f;
+ CompletableFuture future = new CompletableFuture<>();
+ client.publish(topic, payload, retain, qos).whenComplete((m, t) -> {
+ if (t == null) {
+ future.complete(true);
+ } else {
+ future.completeExceptionally(new MqttException(t));
+ }
+ });
+
+ return future;
}
/**
@@ -929,4 +917,5 @@ public class MqttBrokerConnection {
timeoutFuture.cancel(false);
}
}
+
}
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionConfig.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionConfig.java
index dd88f1340..041a1e4e3 100644
--- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionConfig.java
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionConfig.java
@@ -34,6 +34,7 @@ public class MqttBrokerConnectionConfig {
public @Nullable String clientID;
// MQTT parameters
public Integer qos = MqttBrokerConnection.DEFAULT_QOS;
+ @Deprecated
public Boolean retainMessages = false;
/** Keepalive in seconds */
public @Nullable Integer keepAlive;
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttException.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttException.java
index 730801a70..850e2213a 100644
--- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttException.java
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttException.java
@@ -12,12 +12,15 @@
*/
package org.eclipse.smarthome.io.transport.mqtt;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
/**
* Thrown if an error occurs communicating with the server. The exception contains a reason code. The semantic of the
* reason code depends on the underlying implementation.
*
* @author David Graeff - Initial contribution
*/
+@NonNullByDefault
public class MqttException extends Exception {
private static final long serialVersionUID = 301L;
private int reasonCode;
@@ -29,10 +32,22 @@ public class MqttException extends Exception {
*
* @param reasonCode the reason code for the exception.
*/
+ @Deprecated
public MqttException(int reasonCode) {
+ this.cause = new Exception();
this.reasonCode = reasonCode;
}
+ /**
+ * Constructs a new MqttException
with the specified reason
+ *
+ * @param reason the reason for the exception.
+ */
+ public MqttException(String reason) {
+ this.cause = new Exception("reason");
+ this.reasonCode = Integer.MIN_VALUE;
+ }
+
/**
* Constructs a new MqttException
with the specified
* Throwable
as the underlying reason.
@@ -40,12 +55,7 @@ public class MqttException extends Exception {
* @param cause the underlying cause of the exception.
*/
public MqttException(Throwable cause) {
- if (cause instanceof org.eclipse.paho.client.mqttv3.MqttException) {
- org.eclipse.paho.client.mqttv3.MqttException internalException = (org.eclipse.paho.client.mqttv3.MqttException) cause;
- this.reasonCode = internalException.getReasonCode();
- } else {
- this.reasonCode = org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CLIENT_EXCEPTION;
- }
+ this.reasonCode = Integer.MIN_VALUE;
this.cause = cause;
}
@@ -56,6 +66,7 @@ public class MqttException extends Exception {
* @param reason the reason code for the exception.
* @param cause the underlying cause of the exception.
*/
+ @Deprecated
public MqttException(int reason, Throwable cause) {
this.reasonCode = reason;
this.cause = cause;
@@ -66,6 +77,7 @@ public class MqttException extends Exception {
*
* @return the code representing the reason for this exception.
*/
+ @Deprecated
public int getReasonCode() {
return reasonCode;
}
@@ -86,11 +98,7 @@ public class MqttException extends Exception {
*/
@Override
public String getMessage() {
- if (cause != null) {
- return cause.getMessage();
- }
-
- return "MqttException with reason " + String.valueOf(reasonCode);
+ return cause.getMessage();
}
/**
@@ -98,14 +106,6 @@ public class MqttException extends Exception {
*/
@Override
public String toString() {
- if (cause instanceof org.eclipse.paho.client.mqttv3.MqttException) {
- org.eclipse.paho.client.mqttv3.MqttException internalException = (org.eclipse.paho.client.mqttv3.MqttException) cause;
- return internalException.toString();
- }
- String result = getMessage() + " (" + reasonCode + ")";
- if (cause != null) {
- result = result + " - " + cause.toString();
- }
- return result;
+ return cause.toString();
}
}
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ClientCallback.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ClientCallback.java
index e30c28fa7..b2a3a0360 100644
--- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ClientCallback.java
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ClientCallback.java
@@ -16,10 +16,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import com.hivemq.client.mqtt.datatypes.MqttTopic;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection;
import org.eclipse.smarthome.io.transport.mqtt.MqttConnectionObserver;
import org.eclipse.smarthome.io.transport.mqtt.MqttConnectionState;
@@ -29,12 +28,17 @@ import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrate
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
+import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
+
/**
- * Processes the paho MqttCallbacks for the {@link MqttBrokerConnection}.
+ * Processes the MqttCallbacks for the {@link MqttBrokerConnection}.
*
* @author David Graeff - Initial contribution
+ * @author Jan N. Klug - adjusted to HiveMQ client
*/
-public class ClientCallback implements MqttCallback {
+@NonNullByDefault
+public class ClientCallback {
final Logger logger = LoggerFactory.getLogger(ClientCallback.class);
private final MqttBrokerConnection connection;
private final List connectionObservers;
@@ -47,12 +51,11 @@ public class ClientCallback implements MqttCallback {
this.subscribers = subscribers;
}
- @Override
public synchronized void connectionLost(@Nullable Throwable exception) {
if (exception instanceof MqttException) {
MqttException e = (MqttException) exception;
- logger.info("MQTT connection to '{}' was lost: {} : ReasonCode {} : Cause : {}", connection.getHost(),
- e.getMessage(), e.getReasonCode(), (e.getCause() == null ? "Unknown" : e.getCause().getMessage()));
+ logger.info("MQTT connection to '{}' was lost: {} : Cause : {}", connection.getHost(), e.getMessage(),
+ e.getCause().getMessage());
} else if (exception != null) {
logger.info("MQTT connection to '{}' was lost", connection.getHost(), exception);
}
@@ -64,30 +67,33 @@ public class ClientCallback implements MqttCallback {
}
}
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- logger.trace("Message with id {} delivered.", token.getMessageId());
+ public void messageArrived(Mqtt3Publish message) {
+ messageArrived(message.getTopic(), message.getPayloadAsBytes());
}
- @Override
- public void messageArrived(String topic, MqttMessage message) {
- byte[] payload = message.getPayload();
+ public void messageArrived(Mqtt5Publish message) {
+ messageArrived(message.getTopic(), message.getPayloadAsBytes());
+ }
+
+ private void messageArrived(MqttTopic topic, byte[] payload) {
+ String topicString = topic.toString();
logger.trace("Received message on topic '{}' : {}", topic, new String(payload));
- List matches = new ArrayList<>();
+
+ List matchingSubscribers = new ArrayList<>();
synchronized (subscribers) {
subscribers.values().forEach(subscriberList -> {
- if (subscriberList.topicMatch(topic)) {
+ if (subscriberList.topicMatch(topicString)) {
logger.trace("Topic match for '{}' using regex {}", topic, subscriberList.getTopicRegexPattern());
- subscriberList.forEach(consumer -> matches.add(consumer));
+ subscriberList.forEach(consumer -> matchingSubscribers.add(consumer));
} else {
logger.trace("No topic match for '{}' using regex {}", topic,
subscriberList.getTopicRegexPattern());
}
});
-
}
+
try {
- matches.forEach(subscriber -> subscriber.processMessage(topic, payload));
+ matchingSubscribers.forEach(subscriber -> subscriber.processMessage(topicString, payload));
} catch (Exception e) {
logger.error("MQTT message received. MqttMessageSubscriber#processMessage() implementation failure", e);
}
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttActionAdapterCallback.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttActionAdapterCallback.java
deleted file mode 100644
index ca923bedd..000000000
--- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttActionAdapterCallback.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Copyright (c) 2010-2019 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.eclipse.smarthome.io.transport.mqtt.internal;
-
-import java.util.concurrent.CompletableFuture;
-
-import org.eclipse.paho.client.mqttv3.IMqttActionListener;
-import org.eclipse.paho.client.mqttv3.IMqttToken;
-import org.eclipse.smarthome.io.transport.mqtt.MqttActionCallback;
-
-/**
- * Create a IMqttActionListener object for being used as a callback for a publish attempt.
- *
- * @author David Graeff - Initial contribution
- */
-public class MqttActionAdapterCallback implements IMqttActionListener {
- @Override
- public void onSuccess(IMqttToken token) {
- if (token.getUserContext() instanceof MqttActionCallback) {
- MqttActionCallback subscriber = (MqttActionCallback) token.getUserContext();
- subscriber.onSuccess(token.getTopics()[0]);
- } else if (token.getUserContext() instanceof CompletableFuture) {
- @SuppressWarnings("unchecked")
- CompletableFuture future = (CompletableFuture) token.getUserContext();
- future.complete(true);
- }
- }
-
- @Override
- public void onFailure(IMqttToken token, Throwable throwable) {
- if (token.getUserContext() instanceof MqttActionCallback) {
- MqttActionCallback subscriber = (MqttActionCallback) token.getUserContext();
- subscriber.onFailure(token.getTopics()[0], throwable);
- } else if (token.getUserContext() instanceof CompletableFuture) {
- @SuppressWarnings("unchecked")
- CompletableFuture future = (CompletableFuture) token.getUserContext();
- future.completeExceptionally(throwable);
- }
- }
-}
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnectionServiceInstance.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnectionServiceInstance.java
index a6fdecf2a..05c6b33e5 100644
--- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnectionServiceInstance.java
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttBrokerConnectionServiceInstance.java
@@ -15,7 +15,6 @@ package org.eclipse.smarthome.io.transport.mqtt.internal;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
-import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.config.core.Configuration;
@@ -68,10 +67,11 @@ public class MqttBrokerConnectionServiceInstance {
connection.stop();
}
- if (configMap == null || configMap.isEmpty() || mqttService == null) {
+ final MqttServiceImpl service = (MqttServiceImpl) mqttService;
+
+ if (configMap == null || configMap.isEmpty() || service == null) {
return;
}
- final @NonNull MqttServiceImpl service = (@NonNull MqttServiceImpl) mqttService;
// Parse configuration
MqttBrokerConnectionConfig config = new Configuration(configMap).as(MqttBrokerConnectionConfig.class);
@@ -79,7 +79,7 @@ public class MqttBrokerConnectionServiceInstance {
try {
// Compute brokerID and make sure it is not empty
String brokerID = config.getBrokerID();
- if (StringUtils.isBlank(brokerID) || brokerID == null) {
+ if (StringUtils.isBlank(brokerID)) {
logger.warn("Ignore invalid broker connection configuration: {}", config);
return;
}
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt3AsyncClientWrapper.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt3AsyncClientWrapper.java
new file mode 100644
index 000000000..c18677cf8
--- /dev/null
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt3AsyncClientWrapper.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright (c) 2010-2019 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.smarthome.io.transport.mqtt.internal.client;
+
+import java.util.concurrent.CompletableFuture;
+
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.ConnectionCallback;
+import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.Protocol;
+import org.eclipse.smarthome.io.transport.mqtt.MqttWillAndTestament;
+import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback;
+
+import com.hivemq.client.mqtt.MqttClientState;
+import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
+import com.hivemq.client.mqtt.mqtt3.Mqtt3Client;
+import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder;
+import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3Connect;
+import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder;
+import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
+import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscribe;
+import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe;
+
+/**
+ * The {@link Mqtt3AsyncClientWrapper} provides the wrapper for Mqttv3 async clients
+ *
+ * @author Jan N. Klug - Initial contribution
+ */
+@NonNullByDefault
+public class Mqtt3AsyncClientWrapper extends MqttAsyncClientWrapper {
+ private final Mqtt3AsyncClient client;
+
+ public Mqtt3AsyncClientWrapper(String host, int port, String clientId, Protocol protocol, boolean secure,
+ ConnectionCallback connectionCallback, @Nullable TrustManagerFactory trustManagerFactory) {
+ Mqtt3ClientBuilder clientBuilder = Mqtt3Client.builder().serverHost(host).serverPort(port).identifier(clientId)
+ .addConnectedListener(connectionCallback).addDisconnectedListener(connectionCallback);
+
+ if (protocol == Protocol.WEBSOCKETS) {
+ clientBuilder.webSocketWithDefaultConfig();
+ }
+ if (secure) {
+ clientBuilder.sslWithDefaultConfig().sslConfig().trustManagerFactory(trustManagerFactory).applySslConfig();
+ }
+
+ client = clientBuilder.buildAsync();
+ };
+
+ @Override
+ public MqttClientState getState() {
+ return client.getState();
+ }
+
+ @Override
+ public CompletableFuture> subscribe(String topic, int qos, ClientCallback clientCallback) {
+ Mqtt3Subscribe subscribeMessage = Mqtt3Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos))
+ .build();
+ return client.subscribe(subscribeMessage, clientCallback::messageArrived);
+ }
+
+ @Override
+ public CompletableFuture> unsubscribe(String topic) {
+ Mqtt3Unsubscribe unsubscribeMessage = Mqtt3Unsubscribe.builder().topicFilter(topic).build();
+ return client.unsubscribe(unsubscribeMessage);
+ }
+
+ @Override
+ public CompletableFuture publish(String topic, byte[] payload, boolean retain, int qos) {
+ Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic(topic).qos(getMqttQosFromInt(qos)).payload(payload)
+ .retain(retain).build();
+ return client.publish(publishMessage);
+ }
+
+ @Override
+ public CompletableFuture> connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval,
+ @Nullable String username, @Nullable String password) {
+ Mqtt3ConnectBuilder connectMessageBuilder = Mqtt3Connect.builder().keepAlive(keepAliveInterval);
+ if (lwt != null) {
+ Mqtt3Publish willPublish = Mqtt3Publish.builder().topic(lwt.getTopic()).payload(lwt.getPayload())
+ .retain(lwt.isRetain()).qos(getMqttQosFromInt(lwt.getQos())).build();
+ connectMessageBuilder.willPublish(willPublish);
+ }
+
+ if (!StringUtils.isBlank(username) && !StringUtils.isBlank(password) && password != null) {
+ connectMessageBuilder.simpleAuth().username(username).password(password.getBytes()).applySimpleAuth();
+ }
+
+ return client.connect(connectMessageBuilder.build());
+ }
+
+ @Override
+ public CompletableFuture disconnect() {
+ return client.disconnect();
+ }
+
+}
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt5AsyncClientWrapper.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt5AsyncClientWrapper.java
new file mode 100644
index 000000000..9f01d5986
--- /dev/null
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/Mqtt5AsyncClientWrapper.java
@@ -0,0 +1,109 @@
+/**
+ * Copyright (c) 2010-2019 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.smarthome.io.transport.mqtt.internal.client;
+
+import java.util.concurrent.CompletableFuture;
+
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.ConnectionCallback;
+import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection.Protocol;
+import org.eclipse.smarthome.io.transport.mqtt.MqttWillAndTestament;
+import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback;
+
+import com.hivemq.client.mqtt.MqttClientState;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
+import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
+import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe;
+import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
+
+/**
+ * The {@link Mqtt5AsyncClientWrapper} provides the wrapper for Mqttv3 async clients
+ *
+ * @author Jan N. Klug - Initial contribution
+ */
+@NonNullByDefault
+public class Mqtt5AsyncClientWrapper extends MqttAsyncClientWrapper {
+ private final Mqtt5AsyncClient client;
+
+ public Mqtt5AsyncClientWrapper(String host, int port, String clientId, Protocol protocol, boolean secure,
+ ConnectionCallback connectionCallback, @Nullable TrustManagerFactory trustManagerFactory) {
+ Mqtt5ClientBuilder clientBuilder = Mqtt5Client.builder().serverHost(host).serverPort(port).identifier(clientId)
+ .addConnectedListener(connectionCallback).addDisconnectedListener(connectionCallback);
+
+ if (protocol == Protocol.WEBSOCKETS) {
+ clientBuilder.webSocketWithDefaultConfig();
+ }
+ if (secure) {
+ clientBuilder.sslWithDefaultConfig().sslConfig().trustManagerFactory(trustManagerFactory).applySslConfig();
+ }
+
+ client = clientBuilder.buildAsync();
+ };
+
+ @Override
+ public MqttClientState getState() {
+ return client.getState();
+ }
+
+ @Override
+ public CompletableFuture> subscribe(String topic, int qos, ClientCallback clientCallback) {
+ Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos))
+ .build();
+ return client.subscribe(subscribeMessage, clientCallback::messageArrived);
+ }
+
+ @Override
+ public CompletableFuture> unsubscribe(String topic) {
+ Mqtt5Unsubscribe unsubscribeMessage = Mqtt5Unsubscribe.builder().topicFilter(topic).build();
+ return client.unsubscribe(unsubscribeMessage);
+ }
+
+ @Override
+ public CompletableFuture publish(String topic, byte[] payload, boolean retain, int qos) {
+ Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic).qos(getMqttQosFromInt(qos)).payload(payload)
+ .retain(retain).build();
+ return client.publish(publishMessage);
+ }
+
+ @Override
+ public CompletableFuture> connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval,
+ @Nullable String username, @Nullable String password) {
+ Mqtt5ConnectBuilder connectMessageBuilder = Mqtt5Connect.builder().keepAlive(keepAliveInterval);
+ if (lwt != null) {
+ Mqtt5Publish willPublish = Mqtt5Publish.builder().topic(lwt.getTopic()).payload(lwt.getPayload())
+ .retain(lwt.isRetain()).qos(getMqttQosFromInt(lwt.getQos())).build();
+ connectMessageBuilder.willPublish(willPublish);
+ }
+
+ if (!StringUtils.isBlank(username) && !StringUtils.isBlank(password) && password != null) {
+ connectMessageBuilder.simpleAuth().username(username).password(password.getBytes()).applySimpleAuth();
+ }
+
+ return client.connect(connectMessageBuilder.build());
+ }
+
+ @Override
+ public CompletableFuture disconnect() {
+ return client.disconnect();
+ }
+
+}
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java
new file mode 100644
index 000000000..b754837a3
--- /dev/null
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/client/MqttAsyncClientWrapper.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright (c) 2010-2019 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.smarthome.io.transport.mqtt.internal.client;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.eclipse.smarthome.io.transport.mqtt.MqttWillAndTestament;
+import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback;
+
+import com.hivemq.client.mqtt.MqttClientState;
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+
+/**
+ * The {@link AbstractMqttAsyncClient} is the base class for async client wrappers
+ *
+ * @author Jan N. Klug - Initial contribution
+ */
+
+@NonNullByDefault
+public abstract class MqttAsyncClientWrapper {
+ public abstract CompletableFuture> connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval,
+ @Nullable String username, @Nullable String password);
+
+ public abstract CompletableFuture disconnect();
+
+ public abstract MqttClientState getState();
+
+ public abstract CompletableFuture> subscribe(String topic, int qos, ClientCallback clientCallback);
+
+ public abstract CompletableFuture> unsubscribe(String topic);
+
+ public abstract CompletableFuture> publish(String topic, byte[] payload, boolean retain, int qos);
+
+ protected MqttQos getMqttQosFromInt(int qos) {
+ switch (qos) {
+ case 0:
+ return MqttQos.AT_LEAST_ONCE;
+ case 1:
+ return MqttQos.AT_MOST_ONCE;
+ case 2:
+ return MqttQos.EXACTLY_ONCE;
+ default:
+ throw new IllegalArgumentException("QoS needs to be 0, 1 or 2.");
+ }
+ }
+}
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/ssl/CustomTrustManagerFactory.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/ssl/CustomTrustManagerFactory.java
new file mode 100644
index 000000000..27d732f90
--- /dev/null
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/ssl/CustomTrustManagerFactory.java
@@ -0,0 +1,85 @@
+/**
+ * Copyright (c) 2010-2019 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.smarthome.io.transport.mqtt.ssl;
+
+import java.lang.reflect.Field;
+import java.security.KeyStore;
+
+import javax.net.ssl.ManagerFactoryParameters;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.eclipse.smarthome.io.transport.mqtt.sslcontext.SSLContextProvider;
+import org.osgi.service.cm.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.ssl.util.SimpleTrustManagerFactory;
+
+/**
+ * The {@link CustomTrustManagerFactory} is a TrustManagerFactory that provides a custom {@link TrustManager}
+ *
+ * @author Jan N. Klug - Initial contribution
+ */
+@NonNullByDefault
+public class CustomTrustManagerFactory extends SimpleTrustManagerFactory {
+ private final Logger logger = LoggerFactory.getLogger(CustomTrustManagerFactory.class);
+ private final TrustManager[] trustManagers;
+
+ public CustomTrustManagerFactory(TrustManager[] trustManagers) {
+ this.trustManagers = trustManagers;
+ }
+
+ @Deprecated
+ public CustomTrustManagerFactory(SSLContextProvider contextProvider) {
+ TrustManager[] tm;
+ try {
+ SSLContext ctx = contextProvider.getContext();
+
+ // get SSLContextImpl
+ Field contextSpiField = ctx.getClass().getDeclaredField("contextSpi");
+ contextSpiField.setAccessible(true);
+ Object sslContextImpl = contextSpiField.get(ctx);
+ Class> sslContextImplClass = sslContextImpl.getClass().getSuperclass().getSuperclass();
+
+ // get trustmanager
+ Field trustManagerField = sslContextImplClass.getDeclaredField("trustManager");
+ trustManagerField.setAccessible(true);
+ Object trustManagerObj = trustManagerField.get(sslContextImpl);
+
+ tm = new TrustManager[] { (X509TrustManager) trustManagerObj };
+ } catch (IllegalAccessException | NoSuchFieldException | ConfigurationException e) {
+ logger.warn("using default insecure trustmanager, could not extract trustmanager from SSL context:", e);
+ tm = InsecureTrustManagerFactory.INSTANCE.getTrustManagers();
+ }
+ trustManagers = tm;
+ }
+
+ @Override
+ protected void engineInit(@Nullable KeyStore keyStore) throws Exception {
+ }
+
+ @Override
+ protected void engineInit(@Nullable ManagerFactoryParameters managerFactoryParameters) throws Exception {
+ }
+
+ @Override
+ protected TrustManager[] engineGetTrustManagers() {
+ return trustManagers;
+ }
+
+}
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/AcceptAllCertificatesSSLContext.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/AcceptAllCertificatesSSLContext.java
index 819eda6d2..196edd911 100644
--- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/AcceptAllCertificatesSSLContext.java
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/AcceptAllCertificatesSSLContext.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
*
* @author David Graeff - Initial contribution
*/
+@Deprecated
public class AcceptAllCertificatesSSLContext implements SSLContextProvider {
private final Logger logger = LoggerFactory.getLogger(AcceptAllCertificatesSSLContext.class);
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/CustomSSLContextProvider.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/CustomSSLContextProvider.java
new file mode 100644
index 000000000..29b7973c2
--- /dev/null
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/CustomSSLContextProvider.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (c) 2010-2019 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.smarthome.io.transport.mqtt.sslcontext;
+
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.osgi.service.cm.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This SSLContextProvider returns an {@link SSLContext} that accepts all connections and doesn't perform any
+ * certificate validations. This implementation forces a TLS v1.2 {@link SSLContext} instance.
+ *
+ * @author Jan N. Klug - Initial contribution
+ */
+@Deprecated
+public class CustomSSLContextProvider implements SSLContextProvider {
+ private final Logger logger = LoggerFactory.getLogger(CustomSSLContextProvider.class);
+ private final TrustManagerFactory factory;
+
+ public CustomSSLContextProvider(TrustManagerFactory factory) {
+ this.factory = factory;
+ }
+
+ @Override
+ public SSLContext getContext() throws ConfigurationException {
+ try {
+ if (factory == null) {
+ return SSLContext.getDefault();
+ } else {
+ SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
+ sslContext.init(null, factory.getTrustManagers(), null);
+ return sslContext;
+ }
+ } catch (KeyManagementException | NoSuchAlgorithmException e) {
+ logger.warn("SSL configuration failed", e);
+ throw new ConfigurationException("ssl", e.getMessage());
+ }
+ }
+
+}
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/SSLContextProvider.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/SSLContextProvider.java
index 841775aa7..1f158bd7f 100644
--- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/SSLContextProvider.java
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/sslcontext/SSLContextProvider.java
@@ -24,6 +24,7 @@ import org.osgi.service.cm.ConfigurationException;
*
* @author David Graeff - Initial contribution
*/
+@Deprecated
public interface SSLContextProvider {
/**
* Return an {@link SSLContext} to be used by secure Mqtt broker connections. Never return null here. If you are not
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttAsyncClientEx.java b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttAsyncClientEx.java
deleted file mode 100644
index 0e7d964c8..000000000
--- a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttAsyncClientEx.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Copyright (c) 2010-2019 Contributors to the openHAB project
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.eclipse.smarthome.io.transport.mqtt;
-
-import static org.mockito.Mockito.*;
-
-import java.util.Collections;
-
-import org.eclipse.paho.client.mqttv3.IMqttActionListener;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.IMqttToken;
-import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
-import org.eclipse.paho.client.mqttv3.MqttSecurityException;
-
-/**
- * We need an extended MqttAsyncClientEx to overwrite the connection state.
- *
- * In respect to the success flags the operations publish, subscribe, unsubscribe, connect,
- * disconnect immediately succeed or fail.
- *
- * @author David Graeff - Initial contribution
- */
-public class MqttAsyncClientEx extends MqttAsyncClient {
- public MqttBrokerConnectionEx connection;
-
- public MqttAsyncClientEx(String serverURI, String clientId, MqttClientPersistence dataStore,
- MqttBrokerConnectionEx connection) throws MqttException {
- super(serverURI, clientId, dataStore);
- this.connection = connection;
- }
-
- IMqttToken getToken(Object userContext, IMqttActionListener callback, String topic) {
- IMqttToken t = mock(IMqttToken.class);
- doReturn(userContext).when(t).getUserContext();
- doReturn(true).when(t).isComplete();
- doReturn(Collections.singletonList(topic).toArray(new String[1])).when(t).getTopics();
- doReturn(MqttAsyncClientEx.this).when(t).getClient();
- doReturn(callback).when(t).getActionCallback();
- doReturn(null).when(t).getException();
- return t;
- }
-
- IMqttDeliveryToken getDeliveryToken(Object userContext, IMqttActionListener callback, String topic) {
- IMqttDeliveryToken t = mock(IMqttDeliveryToken.class);
- doReturn(userContext).when(t).getUserContext();
- doReturn(true).when(t).isComplete();
- doReturn(Collections.singletonList(topic).toArray(new String[1])).when(t).getTopics();
- doReturn(MqttAsyncClientEx.this).when(t).getClient();
- doReturn(callback).when(t).getActionCallback();
- doReturn(null).when(t).getException();
- return t;
- }
-
- @Override
- public boolean isConnected() {
- return connection.connectionStateOverwrite == MqttConnectionState.CONNECTED;
- }
-
- @Override
- public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained, Object userContext,
- IMqttActionListener callback) throws MqttException, MqttPersistenceException {
- if (connection.publishSuccess) {
- callback.onSuccess(getToken(userContext, callback, topic));
- } else {
- callback.onFailure(getToken(userContext, callback, topic), new MqttException(0));
- }
- return getDeliveryToken(userContext, callback, topic);
- }
-
- @Override
- public IMqttToken subscribe(String topic, int qos, Object userContext, IMqttActionListener callback)
- throws MqttException {
- if (connection.publishSuccess) {
- callback.onSuccess(getToken(userContext, callback, topic));
- } else {
- callback.onFailure(getToken(userContext, callback, topic), new MqttException(0));
- }
- return getToken(userContext, callback, topic);
- }
-
- @Override
- public IMqttToken unsubscribe(String topic, Object userContext, IMqttActionListener callback) throws MqttException {
- if (connection.unsubscribeSuccess) {
- callback.onSuccess(getToken(userContext, callback, topic));
- } else {
- callback.onFailure(getToken(userContext, callback, topic), new MqttException(0));
- }
- return getToken(userContext, callback, topic);
- }
-
- @Override
- public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActionListener callback)
- throws MqttException {
- connection.connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
- if (callback != null) {
- if (connection.disconnectSuccess) {
- callback.onSuccess(getToken(userContext, callback, null));
- } else {
- callback.onFailure(getToken(userContext, callback, null), new MqttException(0));
- }
- }
- return getToken(userContext, callback, null);
- }
-
- @Override
- public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
- throws MqttException, MqttSecurityException {
- if (!connection.connectTimeout) {
- connection.connectionStateOverwrite = MqttConnectionState.CONNECTED;
- if (connection.connectSuccess) {
- callback.onSuccess(getToken(userContext, callback, null));
- } else {
- callback.onFailure(getToken(userContext, callback, null), new MqttException(0));
- }
- } else {
- connection.connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
- }
- return getToken(userContext, callback, null);
- }
-}
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionEx.java b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionEx.java
index aef4127f7..271bf2079 100644
--- a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionEx.java
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionEx.java
@@ -12,22 +12,27 @@
*/
package org.eclipse.smarthome.io.transport.mqtt;
-import static org.mockito.Mockito.spy;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.CompletableFuture;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
-import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
+import org.eclipse.smarthome.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
+
+import com.hivemq.client.mqtt.MqttClientState;
/**
* We need an extended MqttBrokerConnection to overwrite the protected `connectionCallbacks` with
* an instance that takes the mocked version of `MqttBrokerConnection` and overwrite the connection state.
*
- * We also replace the internal MqttAsyncClient with a spied one, that in respect to the success flags
+ * We also mock the internal Mqtt3AsyncClient that in respect to the success flags
* immediately succeed or fail with publish, subscribe, unsubscribe, connect, disconnect.
*
* @author David Graeff - Initial contribution
+ * @author Jan N. Klug - adjusted to HiveMQ client
*/
@NonNullByDefault
public class MqttBrokerConnectionEx extends MqttBrokerConnection {
@@ -48,9 +53,50 @@ public class MqttBrokerConnectionEx extends MqttBrokerConnection {
}
@Override
- protected MqttAsyncClient createClient(String serverURI, String clientId, MqttClientPersistence dataStore)
- throws org.eclipse.paho.client.mqttv3.MqttException {
- return spy(new MqttAsyncClientEx(serverURI, clientId, dataStore, this));
+ protected MqttAsyncClientWrapper createClient() {
+ MqttAsyncClientWrapper mockedClient = mock(MqttAsyncClientWrapper.class);
+ // connect
+ doAnswer(i -> {
+ if (!connectTimeout) {
+ connectionCallback.onConnected(null);
+ connectionStateOverwrite = MqttConnectionState.CONNECTED;
+ return CompletableFuture.completedFuture(null);
+ }
+ return new CompletableFuture();
+ }).when(mockedClient).connect(any(), anyInt(), any(), any());
+ doAnswer(i -> {
+ if (disconnectSuccess) {
+ connectionCallback.onDisconnected(new Throwable("disconnect"));
+ connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
+ return CompletableFuture.completedFuture(null);
+ }
+ return new CompletableFuture();
+ }).when(mockedClient).disconnect();
+ // subscribe
+ doAnswer(i -> {
+ if (subscribeSuccess) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ CompletableFuture future = new CompletableFuture<>();
+ future.completeExceptionally(new Throwable("subscription failed"));
+ return future;
+ }
+ }).when(mockedClient).subscribe(any(), anyInt(), any());
+ // unsubscribe
+ doAnswer(i -> {
+ if (unsubscribeSuccess) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ CompletableFuture future = new CompletableFuture<>();
+ future.completeExceptionally(new Throwable("unsubscription failed"));
+ return future;
+ }
+ }).when(mockedClient).unsubscribe(any());
+ // state
+ doAnswer(i -> {
+ return MqttClientState.CONNECTED;
+ }).when(mockedClient).getState();
+ return mockedClient;
}
@Override
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTests.java b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTests.java
index 9531eebfe..213fe5abe 100644
--- a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTests.java
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTests.java
@@ -28,22 +28,25 @@ import java.util.concurrent.TimeoutException;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
-import org.eclipse.paho.client.mqttv3.IMqttToken;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.smarthome.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrategy;
import org.eclipse.smarthome.io.transport.mqtt.reconnect.PeriodicReconnectStrategy;
import org.junit.Test;
import org.osgi.service.cm.ConfigurationException;
+import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
+import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
+
/**
* Tests the MqttBrokerConnection class
*
* @author David Graeff - Initial contribution
+ * @author Jan N. Klug - adjusted to HiveMQ client
*/
public class MqttBrokerConnectionTests {
@Test
- public void subscribeBeforeOnlineThenConnect() throws ConfigurationException, MqttException, InterruptedException,
- ExecutionException, TimeoutException, org.eclipse.paho.client.mqttv3.MqttException {
+ public void subscribeBeforeOnlineThenConnect()
+ throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException {
MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false,
"MqttBrokerConnectionTests");
@@ -55,14 +58,16 @@ public class MqttBrokerConnectionTests {
assertTrue(connection.hasSubscribers());
assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
+ Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("homie/device123/$name").payload("hello".getBytes())
+ .build();
// Test if subscription is active
- connection.clientCallback.messageArrived("homie/device123/$name", new MqttMessage("hello".getBytes()));
+ connection.clientCallback.messageArrived(publishMessage);
verify(subscriber).processMessage(eq("homie/device123/$name"), eq("hello".getBytes()));
}
@Test
- public void subscribeToWildcardTopic() throws ConfigurationException, MqttException, InterruptedException,
- ExecutionException, TimeoutException, org.eclipse.paho.client.mqttv3.MqttException {
+ public void subscribeToWildcardTopic()
+ throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException {
MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false,
"MqttBrokerConnectionTests");
@@ -80,7 +85,9 @@ public class MqttBrokerConnectionTests {
assertTrue(connection.hasSubscribers());
assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
- connection.clientCallback.messageArrived("homie/device123/$name", new MqttMessage("hello".getBytes()));
+ Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("homie/device123/$name").payload("hello".getBytes())
+ .build();
+ connection.clientCallback.messageArrived(publishMessage);
verify(subscriber).processMessage(eq("homie/device123/$name"), eq("hello".getBytes()));
verify(subscriber2).processMessage(eq("homie/device123/$name"), eq("hello".getBytes()));
@@ -88,8 +95,8 @@ public class MqttBrokerConnectionTests {
}
@Test
- public void subscriber() throws ConfigurationException, MqttException, InterruptedException, ExecutionException,
- TimeoutException, org.eclipse.paho.client.mqttv3.MqttException {
+ public void subscriber()
+ throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException {
MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false,
"MqttBrokerConnectionTests");
@@ -116,7 +123,7 @@ public class MqttBrokerConnectionTests {
// Add subscriber (while connected)
CompletableFuture future = connection.subscribe("topic", subscriber);
- verify(connection.client).subscribe(any(), anyInt(), any(), any());
+ verify(connection.client).subscribe(any(), anyInt(), any());
assertTrue(future.get(200, TimeUnit.MILLISECONDS));
// Remove subscriber (while connected)
@@ -155,11 +162,9 @@ public class MqttBrokerConnectionTests {
// Fake a disconnect
connection.setReconnectStrategy(mockPolicy);
doReturn(MqttConnectionState.DISCONNECTED).when(connection).connectionState();
- IMqttToken token = mock(IMqttToken.class);
- when(token.getException()).thenReturn(new org.eclipse.paho.client.mqttv3.MqttException(1));
connection.isConnecting = true; /* Pretend that start did something */
- connection.connectionCallback.onFailure(token, null);
+ connection.connectionCallback.onDisconnected(new Throwable("disconnected"));
// Check lostConnect
verify(mockPolicy).lostConnection();
@@ -168,7 +173,7 @@ public class MqttBrokerConnectionTests {
assertTrue(mockPolicy.isReconnecting());
// Fake connection established
- connection.connectionCallback.onSuccess(token);
+ connection.connectionCallback.onConnected(null);
assertFalse(mockPolicy.isReconnecting());
}
@@ -221,8 +226,8 @@ public class MqttBrokerConnectionTests {
CompletableFuture future = connection.start();
verify(connection.connectionCallback).createFuture();
- verify(connection.connectionCallback, times(0)).onSuccess(any());
- verify(connection.connectionCallback, times(0)).onFailure(any(), any());
+ verify(connection.connectionCallback, times(0)).onConnected(any());
+ verify(connection.connectionCallback, times(0)).onDisconnected(any(MqttClientDisconnectedContext.class));
assertNotNull(connection.timeoutFuture);
assertThat(future.get(70, TimeUnit.MILLISECONDS), is(false));
@@ -245,17 +250,13 @@ public class MqttBrokerConnectionTests {
// Cause a success callback
connection.connectionStateOverwrite = MqttConnectionState.CONNECTED;
- connection.connectionCallback.onSuccess(null);
+ connection.connectionCallback.onConnected(null);
verify(connectionObserver, times(1)).connectionStateChanged(eq(MqttConnectionState.CONNECTED), isNull());
- // Cause a failure callback with a mocked token
- IMqttToken token = mock(IMqttToken.class);
- org.eclipse.paho.client.mqttv3.MqttException testException = new org.eclipse.paho.client.mqttv3.MqttException(
- 1);
- when(token.getException()).thenReturn(testException);
+ Exception testException = new Exception("test message");
connection.connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
- connection.connectionCallback.onFailure(token, null);
+ connection.connectionCallback.onDisconnected(testException);
verify(connectionObserver, times(1)).connectionStateChanged(eq(MqttConnectionState.DISCONNECTED),
eq(testException));
@@ -318,7 +319,6 @@ public class MqttBrokerConnectionTests {
assertEquals(1, connection.getQos());
// Check for default ssl context provider and reconnect policy
- assertNotNull(connection.getSSLContextProvider());
assertNotNull(connection.getReconnectStrategy());
assertThat(connection.connectionState(), equalTo(MqttConnectionState.DISCONNECTED));
@@ -326,8 +326,8 @@ public class MqttBrokerConnectionTests {
@SuppressWarnings("null")
@Test
- public void gracefulStop() throws ConfigurationException, MqttException, InterruptedException, ExecutionException,
- TimeoutException, org.eclipse.paho.client.mqttv3.MqttException {
+ public void gracefulStop()
+ throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException {
MqttBrokerConnectionEx connection = spy(
new MqttBrokerConnectionEx("123.123.123.123", null, false, "MqttBrokerConnectionTests"));
@@ -340,7 +340,7 @@ public class MqttBrokerConnectionTests {
assertThat(connection.hasSubscribers(), is(true));
// Let's observe the internal connection client
- MqttAsyncClientEx client = (MqttAsyncClientEx) connection.client;
+ MqttAsyncClientWrapper client = connection.client;
// Stop
CompletableFuture future = connection.stop();
@@ -353,7 +353,7 @@ public class MqttBrokerConnectionTests {
future.get(1000, TimeUnit.MILLISECONDS);
verify(connection).unsubscribeAll();
- verify(client).disconnect(anyLong(), any(), any());
+ verify(client).disconnect();
// Subscribers should be removed
assertThat(connection.hasSubscribers(), is(false));
diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttServiceTests.java b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttServiceTests.java
index 8fd3274f9..baa7ffc3d 100644
--- a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttServiceTests.java
+++ b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttServiceTests.java
@@ -40,7 +40,7 @@ public class MqttServiceTests {
service.addBrokersListener(observer);
assertTrue(service.hasBrokerObservers());
- MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("tcp://123.123.123.123", null, false,
+ MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false,
"brokerConnectionListenerTests");
assertTrue(service.addBrokerConnection("name", connection));
diff --git a/features/karaf/openhab-core/src/main/feature/feature.xml b/features/karaf/openhab-core/src/main/feature/feature.xml
index dd2eaa4f5..9f746051f 100644
--- a/features/karaf/openhab-core/src/main/feature/feature.xml
+++ b/features/karaf/openhab-core/src/main/feature/feature.xml
@@ -215,8 +215,8 @@
openhab-core-base
- openhab.tp;filter:="(feature=paho)"
- openhab.tp-paho
+ openhab.tp;filter:="(feature=hivemqclient)"
+ openhab.tp-hivemqclient
mvn:org.openhab.core.bundles/org.openhab.core.io.transport.mqtt/${project.version}
diff --git a/features/karaf/openhab-tp/src/main/feature/feature.xml b/features/karaf/openhab-tp/src/main/feature/feature.xml
index 303110ef5..b8a652bdf 100644
--- a/features/karaf/openhab-tp/src/main/feature/feature.xml
+++ b/features/karaf/openhab-tp/src/main/feature/feature.xml
@@ -57,6 +57,18 @@
mvn:org.eclipse.orbit.bundles/com.google.gson/2.8.2.v20180104-1110
+
+ openhab.tp;feature=hivemqclient;version=1.1.1
+ wrap
+ openhab.tp-netty
+ mvn:org.jctools/jctools-core/2.1.2
+ mvn:io.reactivex.rxjava2/rxjava/2.2.5
+ mvn:org.reactivestreams/reactive-streams/1.0.2
+ wrap:mvn:com.google.dagger/dagger/2.20
+ mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.javax-inject/1_2
+ mvn:com.hivemq/hivemq-mqtt-client/1.1.2
+
+
openhab.tp;feature=httpclient;version=${jetty.version}
mvn:javax.servlet/javax.servlet-api/3.1.0
@@ -190,12 +202,6 @@
mvn:org.mapdb/mapdb/1.0.9
-
- openhab.tp;feature=paho;version=1.2.1
-
- mvn:org.eclipse.paho/org.eclipse.paho.client.mqttv3/1.2.1
-
-
openhab.tp;feature=serial;impl=javacomm
mvn:org.eclipse.kura/org.eclipse.soda.dk.comm/1.2.201