[upb] Fix retry logic (#11342)

* [upb] Fix retry logic

The retry logic was broken so it never retried. This fixes it and adds
unit tests for the serial communication and retry behavior.

Signed-off-by: Marcus Better <marcus@better.se>

* Remove excessive log

Signed-off-by: Marcus Better <marcus@better.se>
This commit is contained in:
Marcus Better 2021-10-09 10:29:09 -04:00 committed by GitHub
parent becb8a48f4
commit d6748cd481
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 226 additions and 29 deletions

View File

@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
import org.openhab.binding.upb.internal.message.MessageBuilder;
import org.openhab.binding.upb.internal.message.MessageParseException;
import org.openhab.binding.upb.internal.message.UPBMessage;
import org.openhab.core.common.NamedThreadFactory;
@ -177,9 +176,13 @@ public class SerialIoThread extends Thread {
listener.incomingMessage(msg);
}
public CompletionStage<CmdStatus> enqueue(final MessageBuilder msg) {
public CompletionStage<CmdStatus> enqueue(final String msg) {
return enqueue(msg, 1);
}
private CompletionStage<CmdStatus> enqueue(final String msg, int numAttempts) {
final CompletableFuture<CmdStatus> completion = new CompletableFuture<>();
final Runnable task = new WriteRunnable(msg.build(), completion);
final Runnable task = new WriteRunnable(msg, completion, numAttempts);
try {
writeExecutor.execute(task);
} catch (final RejectedExecutionException e) {
@ -232,23 +235,18 @@ public class SerialIoThread extends Thread {
private final String msg;
private final CompletableFuture<CmdStatus> completion;
private final CountDownLatch ackLatch = new CountDownLatch(1);
private final int numAttempts;
private @Nullable Boolean ack;
public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion) {
public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion, int numAttempts) {
this.msg = msg;
this.completion = completion;
this.numAttempts = numAttempts;
}
// called by reader thread on ACK or NAK
public void ackReceived(final boolean ack) {
if (logger.isDebugEnabled()) {
if (ack) {
logger.debug("ACK received");
} else {
logger.debug("NAK received");
}
}
this.ack = ack;
ackLatch.countDown();
}
@ -262,25 +260,32 @@ public class SerialIoThread extends Thread {
if (out == null) {
throw new IOException("serial port is not writable");
}
for (int tries = 0; tries < MAX_RETRIES && ack == null; tries++) {
out.write(0x14);
out.write(msg.getBytes(US_ASCII));
out.write(0x0d);
out.flush();
final boolean acked = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
if (acked) {
break;
final CmdStatus res;
out.write(0x14);
out.write(msg.getBytes(US_ASCII));
out.write(0x0d);
out.flush();
final boolean latched = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
if (latched) {
final Boolean ack = this.ack;
if (ack == null) {
logger.debug("write not acked, attempt {}", numAttempts);
res = CmdStatus.WRITE_FAILED;
} else if (ack) {
completion.complete(CmdStatus.ACK);
return;
} else {
logger.debug("NAK received, attempt {}", numAttempts);
res = CmdStatus.NAK;
}
logger.debug("ack timed out, retrying ({} of {})", tries + 1, MAX_RETRIES);
}
final Boolean ack = this.ack;
if (ack == null) {
logger.debug("write not acked");
completion.complete(CmdStatus.WRITE_FAILED);
} else if (ack) {
completion.complete(CmdStatus.ACK);
} else {
completion.complete(CmdStatus.NAK);
logger.debug("ack timed out, attempt {}", numAttempts);
res = CmdStatus.WRITE_FAILED;
}
if (numAttempts < MAX_RETRIES) {
enqueue(msg, numAttempts + 1).thenAccept(completion::complete);
} else {
completion.complete(res);
}
} catch (final IOException | InterruptedException e) {
logger.warn("error writing message", e);

View File

@ -155,7 +155,7 @@ public class SerialPIMHandler extends PIMHandler {
public CompletionStage<CmdStatus> sendPacket(final MessageBuilder msg) {
final SerialIoThread receiveThread = this.receiveThread;
if (receiveThread != null) {
return receiveThread.enqueue(msg);
return receiveThread.enqueue(msg.build());
} else {
return exceptionallyCompletedFuture(new IllegalStateException("I/O thread not active"));
}

View File

@ -0,0 +1,192 @@
/**
* Copyright (c) 2010-2021 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.upb.internal;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.openhab.binding.upb.internal.handler.MessageListener;
import org.openhab.binding.upb.internal.handler.SerialIoThread;
import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
import org.openhab.binding.upb.internal.message.Command;
import org.openhab.binding.upb.internal.message.MessageBuilder;
import org.openhab.binding.upb.internal.message.UPBMessage;
import org.openhab.core.io.transport.serial.SerialPort;
import org.openhab.core.thing.ThingUID;
/**
* @author Marcus Better - Initial contribution
*/
public class SerialIoThreadTest {
private static final String ENABLE_MESSAGE_MODE_CMD = "\u001770028E\n";
private final ThingUID thingUID = new ThingUID("a", "b", "c");
private final Listener msgListener = new Listener();
private final PipedOutputStream in = new PipedOutputStream();
private final OutputStreamWriter inbound = new OutputStreamWriter(in, US_ASCII);
private final PipedOutputStream out = new PipedOutputStream();
private @Mock SerialPort serialPort;
private SerialIoThread thread;
private InputStreamReader outbound;
final char[] buf = new char[256];
@BeforeEach
public void setup() throws IOException {
serialPort = mock(SerialPort.class);
outbound = new InputStreamReader(new PipedInputStream(out), US_ASCII);
when(serialPort.getInputStream()).thenReturn(new PipedInputStream(in));
when(serialPort.getOutputStream()).thenReturn(out);
thread = new SerialIoThread(serialPort, msgListener, thingUID);
thread.start();
}
@AfterEach
public void cleanup() {
thread.terminate();
}
@Test
public void testName() {
assertEquals("OH-binding-a:b:c-serial-reader", thread.getName());
assertTrue(thread.isDaemon());
}
@Test
public void receive() throws Exception {
writeInbound("PU8905FA011220FFFF47\r");
final UPBMessage msg = msgListener.readInbound();
assertEquals(Command.ACTIVATE, msg.getCommand());
assertEquals(1, msg.getDestination());
writeInbound("PU8905FA011221FFFF48\r");
final UPBMessage msg2 = msgListener.readInbound();
assertEquals(Command.DEACTIVATE, msg2.getCommand());
verifyMessageModeCmd();
}
@Test
public void send() throws Exception {
final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2)
.destination((byte) 5).build();
final CompletionStage<CmdStatus> fut = thread.enqueue(msg);
verifyMessageModeCmd();
final int n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
ack();
final CmdStatus res = fut.toCompletableFuture().join();
assertEquals(CmdStatus.ACK, res);
}
@Test
public void resend() throws Exception {
final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2)
.destination((byte) 5).build();
final CompletableFuture<CmdStatus> fut = thread.enqueue(msg).toCompletableFuture();
verifyMessageModeCmd();
int n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
nak();
// should re-send
n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
assertFalse(fut.isDone());
ack();
final CmdStatus res = fut.join();
assertEquals(CmdStatus.ACK, res);
}
@Test
public void resendMaxAttempts() throws Exception {
final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2)
.destination((byte) 5).build();
final CompletableFuture<CmdStatus> fut = thread.enqueue(msg).toCompletableFuture();
verifyMessageModeCmd();
int n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
nak();
// retry
n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
assertFalse(fut.isDone());
// no response - wait for ack timeout
// last retry
n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
assertFalse(fut.isDone());
nak();
final CmdStatus res = fut.join();
assertEquals(CmdStatus.NAK, res);
}
private void ack() throws IOException {
writeInbound("PK\r");
}
private void nak() throws IOException {
writeInbound("PN\r");
}
private void writeInbound(String s) throws IOException {
inbound.write(s);
inbound.flush();
}
private void verifyMessageModeCmd() throws IOException {
final int n = outbound.read(buf, 0, ENABLE_MESSAGE_MODE_CMD.length());
assertEquals(ENABLE_MESSAGE_MODE_CMD, new String(buf, 0, n));
}
private static class Listener implements MessageListener {
private final BlockingQueue<UPBMessage> messages = new LinkedBlockingQueue<>();
@Override
public void incomingMessage(final UPBMessage msg) {
messages.offer(msg);
}
@Override
public void onError(final Throwable t) {
}
public UPBMessage readInbound() {
try {
return messages.take();
} catch (InterruptedException e) {
return null;
}
}
}
}