[mqtt] Properly process retained messages (#3124)

* [mqtt] Properly process retained messages

Only the _first_ message has the retained flag (always on MQTT3,
there is a flag to forward the retain flag on all messages with
MQTT5, but we don't set it), so we have to keep track of previously
retained messages as well.

This shows up as when a thing comes back online and re-subscribes
to a retained message that another thing subscribed to also,
it won't see any changes that happened to that topic since the
very first message was received.

Signed-off-by: Cody Cutrer <cody@cutrer.us>
This commit is contained in:
Cody Cutrer 2022-11-01 02:23:01 -06:00 committed by GitHub
parent d871066efc
commit 3d279eb7f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 8 deletions

View File

@ -44,8 +44,11 @@ public class Subscription {
public void add(MqttMessageSubscriber subscriber) {
if (subscribers.add(subscriber)) {
// new subscriber. deliver all known retained messages
retainedMessages.entrySet().parallelStream()
.forEach(entry -> processMessage(subscriber, entry.getKey(), entry.getValue()));
retainedMessages.entrySet().parallelStream().forEach(entry -> {
if (entry.getValue().length > 0) {
processMessage(subscriber, entry.getKey(), entry.getValue());
}
});
}
}
@ -71,12 +74,12 @@ public class Subscription {
}
public void messageArrived(String topic, byte[] payload, boolean retain) {
if (retain) {
if (payload.length > 0) {
retainedMessages.put(topic, payload);
} else {
retainedMessages.remove(topic);
}
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349265
// Only the first message delivered will have the retain flag; subsequent messages
// will not have the flag set. So see if we retained it in the past, and continue
// to retain it (even if it's now empty - we need to know to continue to retain it)
if (retain || retainedMessages.containsKey(topic)) {
retainedMessages.put(topic, payload);
}
subscribers.parallelStream().forEach(subscriber -> processMessage(subscriber, topic, payload));
}

View File

@ -47,11 +47,16 @@ import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
@NonNullByDefault
public class MqttBrokerConnectionTests extends JavaTest {
private static final byte[] HELLO_BYTES = "hello".getBytes();
private static final byte[] GOODBYE_BYTES = "goodbye".getBytes();
private static byte[] eqHelloBytes() {
return eq(HELLO_BYTES);
}
private static byte[] eqGoodbyeBytes() {
return eq(GOODBYE_BYTES);
}
@Test
public void subscribeBeforeOnlineThenConnect()
throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException {
@ -140,6 +145,28 @@ public class MqttBrokerConnectionTests extends JavaTest {
assertTrue(connection.unsubscribe("topic", subscriber).get(200, TimeUnit.MILLISECONDS));
}
@Test
public void retain()
throws ConfigurationException, MqttException, InterruptedException, ExecutionException, TimeoutException {
MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false, false,
"MqttBrokerConnectionTests");
MqttMessageSubscriber subscriber1 = mock(MqttMessageSubscriber.class);
MqttMessageSubscriber subscriber2 = mock(MqttMessageSubscriber.class);
connection.subscribe("topic", subscriber1);
Mqtt3Publish publishMessage = Mqtt3Publish.builder().topic("topic").payload(HELLO_BYTES).retain(true).build();
connection.getSubscribers().get("topic").messageArrived(publishMessage);
publishMessage = Mqtt3Publish.builder().topic("topic").payload(GOODBYE_BYTES).build();
connection.getSubscribers().get("topic").messageArrived(publishMessage);
connection.subscribe("topic", subscriber2);
// the retained message was updated even though the subsequent message didn't have the retained flag
verify(subscriber2).processMessage(eq("topic"), eqGoodbyeBytes());
}
@Test
public void reconnectPolicyDefault() throws ConfigurationException, MqttException, InterruptedException {
MqttBrokerConnectionEx connection = new MqttBrokerConnectionEx("123.123.123.123", null, false, false,