From 3d279eb7f193bfb7c279a855d47360cd6f60e78f Mon Sep 17 00:00:00 2001 From: Cody Cutrer Date: Tue, 1 Nov 2022 02:23:01 -0600 Subject: [PATCH] [mqtt] Properly process retained messages (#3124) * [mqtt] Properly process retained messages Only the _first_ message has the retained flag (always on MQTT3, there is a flag to forward the retain flag on all messages with MQTT5, but we don't set it), so we have to keep track of previously retained messages as well. This shows up as when a thing comes back online and re-subscribes to a retained message that another thing subscribed to also, it won't see any changes that happened to that topic since the very first message was received. Signed-off-by: Cody Cutrer --- .../transport/mqtt/internal/Subscription.java | 19 +++++++------ .../mqtt/MqttBrokerConnectionTests.java | 27 +++++++++++++++++++ 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/openhab/core/io/transport/mqtt/internal/Subscription.java b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/openhab/core/io/transport/mqtt/internal/Subscription.java index 0a63df43c..da7d00a20 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/openhab/core/io/transport/mqtt/internal/Subscription.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/main/java/org/openhab/core/io/transport/mqtt/internal/Subscription.java @@ -44,8 +44,11 @@ public class Subscription { public void add(MqttMessageSubscriber subscriber) { if (subscribers.add(subscriber)) { // new subscriber. deliver all known retained messages - retainedMessages.entrySet().parallelStream() - .forEach(entry -> processMessage(subscriber, entry.getKey(), entry.getValue())); + retainedMessages.entrySet().parallelStream().forEach(entry -> { + if (entry.getValue().length > 0) { + processMessage(subscriber, entry.getKey(), entry.getValue()); + } + }); } } @@ -71,12 +74,12 @@ public class Subscription { } public void messageArrived(String topic, byte[] payload, boolean retain) { - if (retain) { - if (payload.length > 0) { - retainedMessages.put(topic, payload); - } else { - retainedMessages.remove(topic); - } + // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349265 + // Only the first message delivered will have the retain flag; subsequent messages + // will not have the flag set. So see if we retained it in the past, and continue + // to retain it (even if it's now empty - we need to know to continue to retain it) + if (retain || retainedMessages.containsKey(topic)) { + retainedMessages.put(topic, payload); } subscribers.parallelStream().forEach(subscriber -> processMessage(subscriber, topic, payload)); } diff --git a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/openhab/core/io/transport/mqtt/MqttBrokerConnectionTests.java b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/openhab/core/io/transport/mqtt/MqttBrokerConnectionTests.java index 7187acba4..e29962e9c 100644 --- a/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/openhab/core/io/transport/mqtt/MqttBrokerConnectionTests.java +++ b/bundles/org.openhab.core.io.transport.mqtt/src/test/java/org/openhab/core/io/transport/mqtt/MqttBrokerConnectionTests.java @@ -47,11 +47,16 @@ import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; @NonNullByDefault public class MqttBrokerConnectionTests extends JavaTest { private static final byte[] HELLO_BYTES = "hello".getBytes(); + private static final byte[] GOODBYE_BYTES = "goodbye".getBytes(); private static byte[] eqHelloBytes() { return eq(HELLO_BYTES); } + private static byte[] eqGoodbyeBytes() { + return eq(GOODBYE_BYTES); + } + @Test public void subscribeBeforeOnlineThenConnect() throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException { @@ -140,6 +145,28 @@ public class MqttBrokerConnectionTests extends JavaTest { assertTrue(connection.unsubscribe("topic", subscriber).get(200, TimeUnit.MILLISECONDS)); } + @Test + public void retain() + throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException { + MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false, false, + "MqttBrokerConnectionTests"); + + MqttMessageSubscriber subscriber1 = mock(MqttMessageSubscriber.class); + MqttMessageSubscriber subscriber2 = mock(MqttMessageSubscriber.class); + connection.subscribe("topic", subscriber1); + + Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("topic").payload(HELLO_BYTES).retain(true).build(); + connection.getSubscribers().get("topic").messageArrived(publishMessage); + + publishMessage = Mqtt3Publish.builder().topic("topic").payload(GOODBYE_BYTES).build(); + connection.getSubscribers().get("topic").messageArrived(publishMessage); + + connection.subscribe("topic", subscriber2); + + // the retained message was updated even though the subsequent message didn't have the retained flag + verify(subscriber2).processMessage(eq("topic"), eqGoodbyeBytes()); + } + @Test public void reconnectPolicyDefault() throws ConfigurationException, MqttException, InterruptedException { MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false, false,