From 4f4dfcca20f39c2d8ed059b67e1dba09c9c83957 Mon Sep 17 00:00:00 2001 From: Andrew Fiddian-Green Date: Mon, 28 Mar 2022 20:40:35 +0100 Subject: [PATCH] [openthermgateway] Various improvements (#12507) * [openthermgateway] various tweaks Signed-off-by: Andrew Fiddian-Green --- .../handler/OpenThermGatewayHandler.java | 14 ++ .../internal/OpenThermGatewayCallback.java | 4 +- .../OpenThermGatewaySocketConnector.java | 204 +++++++++++++----- .../OH-INF/thing/openthermgateway.xml | 2 +- 4 files changed, 171 insertions(+), 53 deletions(-) diff --git a/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/handler/OpenThermGatewayHandler.java b/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/handler/OpenThermGatewayHandler.java index 33b7ae7a3cd..06809cd3a5f 100644 --- a/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/handler/OpenThermGatewayHandler.java +++ b/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/handler/OpenThermGatewayHandler.java @@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory; */ @NonNullByDefault public class OpenThermGatewayHandler extends BaseBridgeHandler implements OpenThermGatewayCallback { + private static final String PROPERTY_GATEWAY_ID_NAME = "gatewayId"; + private static final String PROPERTY_GATEWAY_ID_TAG = "PR: A="; private final Logger logger = LoggerFactory.getLogger(OpenThermGatewayHandler.class); @@ -181,6 +183,18 @@ public class OpenThermGatewayHandler extends BaseBridgeHandler implements OpenTh } } + @Override + public void receiveAcknowledgement(String message) { + scheduler.submit(() -> receiveAcknowledgementTask(message)); + } + + private void receiveAcknowledgementTask(String message) { + if (message.startsWith(PROPERTY_GATEWAY_ID_TAG)) { + getThing().setProperty(PROPERTY_GATEWAY_ID_NAME, + message.substring(PROPERTY_GATEWAY_ID_TAG.length()).strip()); + } + } + @Override public void handleRemoval() { logger.debug("Removing OpenThermGateway handler"); diff --git a/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/internal/OpenThermGatewayCallback.java b/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/internal/OpenThermGatewayCallback.java index d78a9a35a11..2d6f392d97a 100644 --- a/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/internal/OpenThermGatewayCallback.java +++ b/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/internal/OpenThermGatewayCallback.java @@ -17,7 +17,7 @@ import org.eclipse.jdt.annotation.NonNullByDefault; /** * The {@link OpenThermGatewayCallback} is used as a callback interface by a connector to signal status * and relay incoming messages to be processed by the binding. - * + * * @author Arjen Korevaar - Initial contribution */ @NonNullByDefault @@ -26,4 +26,6 @@ public interface OpenThermGatewayCallback { void connectionStateChanged(ConnectionState state); void receiveMessage(Message message); + + void receiveAcknowledgement(String message); } diff --git a/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/internal/OpenThermGatewaySocketConnector.java b/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/internal/OpenThermGatewaySocketConnector.java index 7e8f0942dba..c6865c6a9cf 100644 --- a/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/internal/OpenThermGatewaySocketConnector.java +++ b/bundles/org.openhab.binding.openthermgateway/src/main/java/org/openhab/binding/openthermgateway/internal/OpenThermGatewaySocketConnector.java @@ -20,10 +20,9 @@ import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.AbstractMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; +import java.time.Instant; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -42,11 +41,15 @@ import org.slf4j.LoggerFactory; * * @author Arjen Korevaar - Initial contribution * @author Arjan Mels - Improved robustness by re-sending commands, handling all message types (not only Boiler) + * @author Andrew Fiddian-Green - Improve thread interruption, socket timeouts, exception handling, FIFO command queue */ @NonNullByDefault public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnector { private static final int COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS = 100; private static final int COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS = 5000; + private static final int MAXIMUM_FIFO_BUFFER_SIZE = 20; + + private static final String WDT_RESET_RESPONSE_MESSAGE = "WDT reset"; private final Logger logger = LoggerFactory.getLogger(OpenThermGatewaySocketConnector.class); @@ -61,7 +64,56 @@ public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnecto private @Nullable Future future; private @Nullable ExecutorService executor; - private Map> pendingCommands = new ConcurrentHashMap<>(); + /** + * FIFO queue of commands that are pending being sent to the gateway. That is commands that are either not yet sent, + * or sent but not yet acknowledged and pending possible re-sending. + * + * Note: we must use 'synchronized' when accessing this object to ensure proper thread safety. + */ + private final Queue pendingCommands = new ArrayDeque<>(MAXIMUM_FIFO_BUFFER_SIZE); + + /** + * Wrapper for a command entry in the pending command FIFO queue. + * + * @author AndrewFG - initial contribution + */ + private class PendingCommand { + protected final GatewayCommand command; + protected final Instant expiryTime = Instant.now().plusMillis(COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS); + protected Instant sentTime = Instant.MIN; + + protected PendingCommand(GatewayCommand command) { + this.command = command; + } + + /** + * Check if the command has been sent to the gateway. + * + * @return true if it has been sent + */ + protected boolean sent() { + return Instant.MIN.isBefore(sentTime); + } + + /** + * Check if the command is ready to send (or re-send) to the gateway. + * + * @return true if the command has either not been sent, or sent but not acknowledged within due time i.e. it + * needs to be re-sent + */ + protected boolean readyToSend() { + return sentTime.isBefore(Instant.now().minusMillis(COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS)); + } + + /** + * Check if the command has expired. + * + * @return true if the expiry time has expired + */ + protected boolean expired() { + return Instant.now().isAfter(expiryTime); + } + } public OpenThermGatewaySocketConnector(OpenThermGatewayCallback callback, OpenThermGatewayConfiguration config) { this.callback = callback; @@ -98,12 +150,15 @@ public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnecto String message = reader.readLine(); if (message != null) { + logger.trace("Read: {}", message); handleMessage(message); } else { logger.debug("Received NULL message from OpenTherm Gateway (EOF)"); break; } } + // disable reporting every message (for cleaner re-starting) + sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTSUMMARY, "1")); } catch (IOException ex) { logger.warn("Error communicating with OpenTherm Gateway: '{}'", ex.getMessage()); } @@ -162,61 +217,108 @@ public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnecto } @Override - public synchronized void sendCommand(GatewayCommand command) { - PrintWriter wrt = writer; - - pendingCommands.put(command.getCode(), - new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), command)); - - String msg = command.toFullString(); - - if (isConnected() && (wrt != null)) { - logger.debug("Sending message: {}", msg); - wrt.print(msg + "\r\n"); - wrt.flush(); - if (wrt.checkError()) { - logger.warn("sendCommand() error sending message to OpenTherm Gateway => PLEASE REPORT !!"); - stop(); + public void sendCommand(GatewayCommand command) { + synchronized (pendingCommands) { + // append the command to the end of the FIFO queue + if (MAXIMUM_FIFO_BUFFER_SIZE < pendingCommands.size() + || !pendingCommands.offer(new PendingCommand(command))) { + logger.warn("Command refused: FIFO buffer overrun => PLEASE REPORT !!"); } + // send the FIFO head command, which may or may not be the one just added + pendingCommandsSendHeadCommandIfReady(); + } // release the pendingCommands lock + } + + /** + * Process the incoming message. Remove any expired commands from the queue. Check if the incoming message is an + * acknowledgement. If it is the acknowledgement for the FIFO head command, remove it from the queue. Try to send + * the (next) FIFO head command, if it exists, and is ready to send. And finally if the message is not an + * acknowledgement, check if it is a valid message, and if so, pass it to the gateway Thing handler for processing. + * + * @param message the incoming message received from the gateway + */ + private void handleMessage(String message) { + // check if the message is a command acknowledgement e.g. having the form "XX: yyy" + boolean isCommandAcknowledgement = (message.length() > 2) && (message.charAt(2) == ':'); + + synchronized (pendingCommands) { + // remove all expired commands + pendingCommandsRemoveAllExpiredCommands(); + + // if acknowledgement is for the FIFO head command, remove it from the queue + if (isCommandAcknowledgement) { + pendingCommandsRemoveHeadCommandIfAcknowledgement(message); + } + + // (re-)send the FIFO head command, if it exists and is ready to send + pendingCommandsSendHeadCommandIfReady(); + } // release the pendingCommands lock + + if (isCommandAcknowledgement) { + callback.receiveAcknowledgement(message); + } else if (message.startsWith(WDT_RESET_RESPONSE_MESSAGE)) { + logger.warn("OpenTherm Gateway was reset by its Watch-Dog Timer!"); } else { - logger.debug("Unable to send message: {}. OpenThermGatewaySocketConnector is not connected.", msg); + Message msg = Message.parse(message); + + // ignore and log bad messages + if (msg == null) { + logger.debug("Received message: {}, (unknown)", message); + return; + } + + // pass good messages to the Thing handler for processing + if (msg.getMessageType() == MessageType.READACK || msg.getMessageType() == MessageType.WRITEDATA + || msg.getID() == 0 || msg.getID() == 1) { + callback.receiveMessage(msg); + } } } - private void handleMessage(String message) { - if (message.length() > 2 && message.charAt(2) == ':') { - String code = message.substring(0, 2); - String value = message.substring(3); + /** + * If there is a FIFO head command that is ready to (re-)send, then send it. + */ + private void pendingCommandsSendHeadCommandIfReady() { + // process the command at the head of the queue + PendingCommand headCommand = pendingCommands.peek(); + if (headCommand != null && headCommand.readyToSend()) { + String message = headCommand.command.toFullString(); - logger.debug("Received command confirmation: {}: {}", code, value); - pendingCommands.remove(code); - return; - } - - long currentTime = System.currentTimeMillis(); - - for (Entry timeAndCommand : pendingCommands.values()) { - long responseTime = timeAndCommand.getKey() + COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS; - long timeoutTime = timeAndCommand.getKey() + COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS; - - if (currentTime > responseTime && currentTime <= timeoutTime) { - logger.debug("Resending command: {}", timeAndCommand.getValue()); - sendCommand(timeAndCommand.getValue()); - } else if (currentTime > timeoutTime) { - pendingCommands.remove(timeAndCommand.getValue().getCode()); + // transmit the command string + PrintWriter writer = this.writer; + if (isConnected() && (writer != null)) { + writer.print(message + "\r\n"); + writer.flush(); + if (writer.checkError()) { + logger.warn("Error sending command to OpenTherm Gateway => PLEASE REPORT !!"); + stop(); + } + if (logger.isTraceEnabled()) { + logger.trace("Sent: {}{}", message, headCommand.sent() ? " (repeat)" : ""); + } + headCommand.sentTime = Instant.now(); + } else { + logger.debug("Unable to send command: {}. OpenThermGatewaySocketConnector is not connected.", message); } } + } - Message msg = Message.parse(message); + /** + * If the acknowledgement message corresponds to the FIFO head command then remove it from the queue. + * + * @param message must be an acknowledgement message in the form "XX: yyy" + */ + private void pendingCommandsRemoveHeadCommandIfAcknowledgement(String message) { + PendingCommand headCommand = pendingCommands.peek(); + if (headCommand != null && headCommand.command.getCode().equals(message.substring(0, 2))) { + pendingCommands.poll(); + } + } - if (msg == null) { - logger.trace("Received message: {}, (unknown)", message); - return; - } - logger.trace("Received message: {}, {} {} {}", message, msg.getID(), msg.getCodeType(), msg.getMessageType()); - if (msg.getMessageType() == MessageType.READACK || msg.getMessageType() == MessageType.WRITEDATA - || msg.getID() == 0 || msg.getID() == 1) { - callback.receiveMessage(msg); - } + /** + * Remove all expired commands from the queue. + */ + private void pendingCommandsRemoveAllExpiredCommands() { + pendingCommands.removeIf(pendingCommand -> pendingCommand.expired()); } } diff --git a/bundles/org.openhab.binding.openthermgateway/src/main/resources/OH-INF/thing/openthermgateway.xml b/bundles/org.openhab.binding.openthermgateway/src/main/resources/OH-INF/thing/openthermgateway.xml index ac5a39d6a34..4f4337164f9 100644 --- a/bundles/org.openhab.binding.openthermgateway/src/main/resources/OH-INF/thing/openthermgateway.xml +++ b/bundles/org.openhab.binding.openthermgateway/src/main/resources/OH-INF/thing/openthermgateway.xml @@ -10,7 +10,7 @@ - 2.2.0 + 2.2.1