fix failing subscriptions (#1267)

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
This commit is contained in:
J-N-K 2019-12-11 08:51:18 +01:00 committed by Kai Kreuzer
parent 218bb79dfd
commit ea2ef7e25b

View File

@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
@NonNullByDefault @NonNullByDefault
public abstract class MqttAsyncClientWrapper { public abstract class MqttAsyncClientWrapper {
private final Logger logger = LoggerFactory.getLogger(MqttAsyncClientWrapper.class); private final Logger logger = LoggerFactory.getLogger(MqttAsyncClientWrapper.class);
private final Set<ClientCallback> subscriptions = ConcurrentHashMap.newKeySet(); private final Set<String> subscriptions = ConcurrentHashMap.newKeySet();
/** /**
* connect this client * connect this client
@ -72,7 +72,7 @@ public abstract class MqttAsyncClientWrapper {
* @return a CompletableFuture (exceptionally on fail) * @return a CompletableFuture (exceptionally on fail)
*/ */
public CompletableFuture<Boolean> subscribe(String topic, int qos, ClientCallback clientCallback) { public CompletableFuture<Boolean> subscribe(String topic, int qos, ClientCallback clientCallback) {
boolean needsSubscription = subscriptions.add(clientCallback); boolean needsSubscription = subscriptions.add(topic);
if (needsSubscription) { if (needsSubscription) {
logger.trace("Trying to subscribe {} to topic {}", this, topic); logger.trace("Trying to subscribe {} to topic {}", this, topic);
return internalSubscribe(topic, qos, clientCallback).thenApply(s -> { return internalSubscribe(topic, qos, clientCallback).thenApply(s -> {