fix multiple MQTT subscriptions (#1259)

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
This commit is contained in:
J-N-K 2019-12-07 18:44:35 +01:00 committed by Wouter Born
parent 5dbd33ab61
commit 6e962d9dbc
4 changed files with 123 additions and 40 deletions

View File

@ -72,16 +72,14 @@ public class MqttBrokerConnection {
* MQTT transport protocols
*/
public enum Protocol {
TCP,
WEBSOCKETS
TCP, WEBSOCKETS
}
/**
* MQTT version (currently v3 and v5)
*/
public enum MqttVersion {
V3,
V5
V3, V5
}
// Connection parameters
@ -197,9 +195,9 @@ public class MqttBrokerConnection {
* @param port A port or null to select the default port for a secure or insecure connection
* @param secure A secure connection
* @param clientId Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are
* used for access restriction implementations.
* If none is specified, a default is generated. The client id cannot be longer than 65535
* characters.
* used for access restriction implementations.
* If none is specified, a default is generated. The client id cannot be longer than 65535
* characters.
* @throws IllegalArgumentException If the client id or port is not valid.
*/
public MqttBrokerConnection(String host, @Nullable Integer port, boolean secure, @Nullable String clientId) {
@ -214,9 +212,9 @@ public class MqttBrokerConnection {
* @param port A port or null to select the default port for a secure or insecure connection
* @param secure A secure connection
* @param clientId Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are
* used for access restriction implementations.
* If none is specified, a default is generated. The client id cannot be longer than 65535
* characters.
* used for access restriction implementations.
* If none is specified, a default is generated. The client id cannot be longer than 65535
* characters.
* @throws IllegalArgumentException If the client id or port is not valid.
*/
@Deprecated
@ -234,9 +232,9 @@ public class MqttBrokerConnection {
* @param port A port or null to select the default port for a secure or insecure connection
* @param secure A secure connection
* @param clientId Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are
* used for access restriction implementations.
* If none is specified, a default is generated. The client id cannot be longer than 65535
* characters.
* used for access restriction implementations.
* If none is specified, a default is generated. The client id cannot be longer than 65535
* characters.
* @throws IllegalArgumentException If the client id or port is not valid.
*/
public MqttBrokerConnection(Protocol protocol, MqttVersion mqttVersion, String host, @Nullable Integer port,
@ -427,7 +425,7 @@ public class MqttBrokerConnection {
*
* @param lastWill The last will object or null.
* @param applyImmediately If true, the connection will stopped and started for the new last-will to take effect
* immediately.
* immediately.
* @throws MqttException
* @throws ConfigurationException
*/
@ -483,8 +481,9 @@ public class MqttBrokerConnection {
if (isConnecting) {
return MqttConnectionState.CONNECTING;
}
return (client != null && client.getState().isConnected()) ? MqttConnectionState.CONNECTED
: MqttConnectionState.DISCONNECTED;
return (client != null && client.getState().isConnected()) ?
MqttConnectionState.CONNECTED :
MqttConnectionState.DISCONNECTED;
}
/**
@ -520,7 +519,7 @@ public class MqttBrokerConnection {
* Set the ssl context provider. The default provider is {@see AcceptAllCertifcatesSSLContext}.
*
* @return The ssl context provider. Should not be null, but the ssl context will in fact
* only be used if a ssl:// url is given.
* only be used if a ssl:// url is given.
*/
@Deprecated
public void setSSLContextProvider(SSLContextProvider sslContextProvider) {
@ -561,14 +560,21 @@ public class MqttBrokerConnection {
return future;
}
if (client.getState().isConnected()) {
client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
if (t == null) {
logger.trace("Subscribed {} to topic {}", subscriber, topic);
future.complete(true);
} else {
future.completeExceptionally(new MqttException(t));
}
});
if (!client.isSubscribed(topic)) {
// "real" subscription for first subscriber
client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
if (t == null) {
logger.trace("Subscribed {} to topic {}", subscriber, topic);
future.complete(true);
} else {
future.completeExceptionally(new MqttException(t));
}
});
} else {
// client already subscribed
logger.trace("Subscribed {} to topic {} (follow-up subscription)", subscriber, topic);
future.complete(true);
}
} else {
// The subscription will be performed on connecting.
future.complete(false);
@ -619,7 +625,8 @@ public class MqttBrokerConnection {
}
list.remove(subscriber);
if (!list.isEmpty()) {
logger.trace("Removed {} from topic subscribers for topic {}, but other subscribers present", subscriber, topic);
logger.trace("Removed {} from topic subscribers for topic {}, but other subscribers present",
subscriber, topic);
return CompletableFuture.completedFuture(true);
}
// Remove from subscriber list
@ -627,7 +634,8 @@ public class MqttBrokerConnection {
// No more subscribers to this topic. Unsubscribe topic on the broker
MqttAsyncClientWrapper mqttClient = this.client;
if (mqttClient != null) {
logger.trace("Subscriber list is empty after removing {}, unsubscribing topic {} from connection", subscriber, topic);
logger.trace("Subscriber list is empty after removing {}, unsubscribing topic {} from connection",
subscriber, topic);
return unsubscribeRaw(mqttClient, topic);
} else {
return CompletableFuture.completedFuture(false);
@ -641,7 +649,7 @@ public class MqttBrokerConnection {
* @param client The client connection
* @param topic The topic to unsubscribe from
* @return Completes with true if successful. Completes with false if no broker connection is established.
* Exceptionally otherwise.
* Exceptionally otherwise.
*/
protected CompletableFuture<Boolean> unsubscribeRaw(MqttAsyncClientWrapper client, String topic) {
logger.trace("Unsubscribing message consumer for topic '{}' from broker '{}'", topic, host);
@ -691,7 +699,7 @@ public class MqttBrokerConnection {
* do nothing if there is already an active connection.
*
* @return Returns a future that completes with true if already connected or connecting,
* completes with false if a connection timeout has happened and completes exceptionally otherwise.
* completes with false if a connection timeout has happened and completes exceptionally otherwise.
*/
public CompletableFuture<Boolean> start() {
// We don't want multiple concurrent threads to start a connection
@ -877,7 +885,7 @@ public class MqttBrokerConnection {
* @param topic The topic
* @param payload The message payload
* @return Returns a future that completes with a result of true if the publishing succeeded and completes
* exceptionally on an error or with a result of false if no broker connection is established.
* exceptionally on an error or with a result of false if no broker connection is established.
*/
public CompletableFuture<Boolean> publish(String topic, byte[] payload) {
return publish(topic, payload, getQos(), isRetain());
@ -891,7 +899,7 @@ public class MqttBrokerConnection {
* @param qos The quality of service for this message
* @param retain Set to true to retain the message on the broker
* @return Returns a future that completes with a result of true if the publishing succeeded and completes
* exceptionally on an error or with a result of false if no broker connection is established.
* exceptionally on an error or with a result of false if no broker connection is established.
*/
public CompletableFuture<Boolean> publish(String topic, byte[] payload, int qos, boolean retain) {
final MqttAsyncClientWrapper client = this.client;

View File

@ -56,7 +56,7 @@ public class Mqtt3AsyncClientWrapper extends MqttAsyncClientWrapper {
}
client = clientBuilder.buildAsync();
};
}
@Override
public MqttClientState getState() {
@ -64,14 +64,14 @@ public class Mqtt3AsyncClientWrapper extends MqttAsyncClientWrapper {
}
@Override
public CompletableFuture<?> subscribe(String topic, int qos, ClientCallback clientCallback) {
public CompletableFuture<?> internalSubscribe(String topic, int qos, ClientCallback clientCallback) {
Mqtt3Subscribe subscribeMessage = Mqtt3Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos))
.build();
return client.subscribe(subscribeMessage, clientCallback::messageArrived);
}
@Override
public CompletableFuture<?> unsubscribe(String topic) {
public CompletableFuture<?> internalUnsubscribe(String topic) {
Mqtt3Unsubscribe unsubscribeMessage = Mqtt3Unsubscribe.builder().topicFilter(topic).build();
return client.unsubscribe(unsubscribeMessage);
}

View File

@ -57,7 +57,7 @@ public class Mqtt5AsyncClientWrapper extends MqttAsyncClientWrapper {
}
client = clientBuilder.buildAsync();
};
}
@Override
public MqttClientState getState() {
@ -65,14 +65,14 @@ public class Mqtt5AsyncClientWrapper extends MqttAsyncClientWrapper {
}
@Override
public CompletableFuture<?> subscribe(String topic, int qos, ClientCallback clientCallback) {
public CompletableFuture<?> internalSubscribe(String topic, int qos, ClientCallback clientCallback) {
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder().topicFilter(topic).qos(getMqttQosFromInt(qos))
.build();
return client.subscribe(subscribeMessage, clientCallback::messageArrived);
}
@Override
public CompletableFuture<?> unsubscribe(String topic) {
public CompletableFuture<?> internalUnsubscribe(String topic) {
Mqtt5Unsubscribe unsubscribeMessage = Mqtt5Unsubscribe.builder().topicFilter(topic).build();
return client.unsubscribe(unsubscribeMessage);
}

View File

@ -12,6 +12,8 @@
*/
package org.eclipse.smarthome.io.transport.mqtt.internal.client;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jdt.annotation.NonNullByDefault;
@ -21,28 +23,101 @@ import org.eclipse.smarthome.io.transport.mqtt.internal.ClientCallback;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link AbstractMqttAsyncClient} is the base class for async client wrappers
* The {@link MqttAsyncClientWrapper} is the base class for async client wrappers
*
* @author Jan N. Klug - Initial contribution
*/
@NonNullByDefault
public abstract class MqttAsyncClientWrapper {
private final Logger logger = LoggerFactory.getLogger(MqttAsyncClientWrapper.class);
private final List<String> subscribedTopics = new ArrayList<>();
/**
* connect this client
*
* @param lwt last-will and testament (optional)
* @param keepAliveInterval keep-alive interval in ms
* @param username username (optional)
* @param password password (optional)
* @return a CompletableFuture (exceptionally on fail)
*/
public abstract CompletableFuture<?> connect(@Nullable MqttWillAndTestament lwt, int keepAliveInterval,
@Nullable String username, @Nullable String password);
/**
* disconnect this client
*
* @return a CompletableFuture (exceptionally on fail)
*/
public abstract CompletableFuture<Void> disconnect();
/**
* get the connection state of this client
*
* @return the client state
*/
public abstract MqttClientState getState();
public abstract CompletableFuture<?> subscribe(String topic, int qos, ClientCallback clientCallback);
/**
* subscribe a client callback to a topic (keeps track of subscriptions)
*
* @param topic the topic
* @param qos QoS for this subscription
* @param clientCallback the client callback that need
* @return a CompletableFuture (exceptionally on fail)
*/
public CompletableFuture<Boolean> subscribe(String topic, int qos, ClientCallback clientCallback) {
subscribedTopics.add(topic);
return internalSubscribe(topic, qos, clientCallback).thenApply(s -> {
logger.trace("successfully subscribed {} to topic {}", this, topic);
return true;
});
}
public abstract CompletableFuture<?> unsubscribe(String topic);
/**
* subscribe a client callback to a topic (keeps track of subscriptions)
*
* @param topic the topic
* @return a CompletableFuture (exceptionally on fail)
*/
public CompletableFuture<Boolean> unsubscribe(String topic) {
subscribedTopics.remove(topic);
return internalUnsubscribe(topic).thenApply(s -> {
logger.trace("successfully unsubscribed {} from topic {}", this, topic);
return true;
});
}
/**
* publish a message
*
* @param topic the topic
* @param payload the message as byte array
* @param retain whether this message should be retained
* @param qos the QoS level of this message
* @return a CompletableFuture (exceptionally on fail)
*/
public abstract CompletableFuture<?> publish(String topic, byte[] payload, boolean retain, int qos);
/**
* check if this client is subscribed to a topic
*
* @param topic the topic
* @return true if subscribed
*/
public boolean isSubscribed(String topic) {
return subscribedTopics.contains(topic);
}
protected abstract CompletableFuture<?> internalSubscribe(String topic, int qos, ClientCallback clientCallback);
protected abstract CompletableFuture<?> internalUnsubscribe(String topic);
protected MqttQos getMqttQosFromInt(int qos) {
switch (qos) {
case 0: