[insteon] Refactor iostream transport classes (#17930)

Signed-off-by: jsetton <jeremy.setton@gmail.com>
Signed-off-by: Ciprian Pascu <contact@ciprianpascu.ro>
This commit is contained in:
Jeremy 2024-12-21 18:47:36 -05:00 committed by Ciprian Pascu
parent 1c26aea3a0
commit 1bd25ff7ca
8 changed files with 92 additions and 188 deletions

View File

@ -16,7 +16,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Objects;
@ -34,8 +33,6 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.openhab.binding.insteon.internal.utils.HexUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements IOStream for an Insteon Hub 2
@ -47,10 +44,8 @@ import org.slf4j.LoggerFactory;
*/
@NonNullByDefault
public class HubIOStream extends IOStream {
private final Logger logger = LoggerFactory.getLogger(HubIOStream.class);
private static final String BS_START = "<BS>";
private static final String BS_END = "</BS>";
private static final String BUFFER_TAG_START = "<BS>";
private static final String BUFFER_TAG_END = "</BS>";
private static final int REQUEST_TIMEOUT = 30; // in seconds
private String host;
@ -61,7 +56,7 @@ public class HubIOStream extends IOStream {
private ScheduledExecutorService scheduler;
private @Nullable ScheduledFuture<?> job;
// index of the last byte we have read in the buffer
private int bufferIdx = -1;
private volatile int bufferIdx = -1;
/**
* Constructor
@ -84,14 +79,9 @@ public class HubIOStream extends IOStream {
this.scheduler = scheduler;
}
@Override
public boolean isOpen() {
return job != null;
}
@Override
public boolean open() {
if (isOpen()) {
if (job != null) {
logger.warn("hub stream is already open");
return false;
}
@ -120,93 +110,65 @@ public class HubIOStream extends IOStream {
@Override
public void close() {
super.close();
ScheduledFuture<?> job = this.job;
if (job != null) {
job.cancel(true);
this.job = null;
}
InputStream in = this.in;
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.debug("failed to close input stream", e);
}
this.in = null;
}
OutputStream out = this.out;
if (out != null) {
try {
out.close();
} catch (IOException e) {
logger.debug("failed to close output stream", e);
}
this.out = null;
}
}
/**
* Fetches the latest status buffer from the Hub
* Returns the latest buffer from the Hub
*
* @return string with status buffer
* @return the buffer string
* @throws IOException
*/
private synchronized String bufferStatus() throws IOException {
private String getBuffer() throws IOException {
String result = getURL("/buffstatus.xml");
int start = result.indexOf(BS_START);
if (start == -1) {
throw new IOException("malformed bufferstatus.xml");
}
start += BS_START.length();
int end = result.indexOf(BS_END, start);
if (end == -1) {
throw new IOException("malformed bufferstatus.xml");
int start = result.indexOf(BUFFER_TAG_START);
int end = result.indexOf(BUFFER_TAG_END, start);
if (start == -1 || end == -1) {
throw new IOException("malformed buffstatus.xml");
}
start += BUFFER_TAG_START.length();
return result.substring(start, end).trim();
}
/**
* Sends command to Hub to clear the status buffer
* Clears the Hub buffer
*
* @throws IOException
*/
private synchronized void clearBuffer() throws IOException {
private void clearBuffer() throws IOException {
logger.trace("clearing buffer");
getURL("/1?XB=M=1");
bufferIdx = 0;
}
/**
* Sends Insteon message (byte array) as a readable ascii string to the Hub
* Sends a message to the Hub
*
* @param msg byte array representing the Insteon message
* @throws IOException in case of I/O error
* @param b byte array representing the Insteon message
* @throws IOException
*/
public synchronized void write(ByteBuffer msg) throws IOException {
poll(); // fetch the status buffer before we send out commands
StringBuilder b = new StringBuilder();
while (msg.remaining() > 0) {
b.append(String.format("%02x", msg.get()));
}
String hexMsg = b.toString();
logger.trace("writing a message");
getURL("/3?" + hexMsg + "=I=3");
private void sendMessage(byte[] b) throws IOException {
poll(); // poll the status buffer before we send the message
logger.trace("sending a message");
getURL("/3?" + HexUtils.getHexString(b) + "=I=3");
bufferIdx = 0;
}
/**
* Polls the Hub web interface to fetch the status buffer
* Polls the Hub buffer and add to input stream
*
* @throws IOException if something goes wrong with I/O
* @throws IOException
*/
private synchronized void poll() throws IOException {
String buffer = bufferStatus(); // fetch via http call
private void poll() throws IOException {
String buffer = getBuffer();
logger.trace("poll: {}", buffer);
// The Hub maintains a ring buffer where the last two digits (in hex!) represent
// the position of the last byte read.
@ -249,10 +211,9 @@ public class HubIOStream extends IOStream {
logger.trace("no wrap: appending new data: {}", msg);
}
if (msg.length() != 0) {
byte[] array = HexUtils.toByteArray(msg.toString());
ByteBuffer buf = ByteBuffer.wrap(array);
byte[] b = HexUtils.toByteArray(msg.toString());
if (in instanceof HubInputStream hubInput) {
hubInput.handle(buf);
hubInput.add(b);
} else {
logger.debug("hub input stream is null");
}
@ -310,10 +271,10 @@ public class HubIOStream extends IOStream {
// A buffer to keep bytes while we are waiting for the inputstream to read
private ReadByteBuffer buffer = new ReadByteBuffer(1024);
public void handle(ByteBuffer b) throws IOException {
public void add(byte[] b) throws IOException {
// Make sure we cleanup as much space as possible
buffer.makeCompact();
buffer.add(b.array());
buffer.add(b);
}
@Override
@ -342,18 +303,18 @@ public class HubIOStream extends IOStream {
@Override
public void write(int b) throws IOException {
out.write(b);
flushBuffer();
flush();
}
@Override
public void write(byte @Nullable [] b, int off, int len) throws IOException {
out.write(b, off, len);
flushBuffer();
flush();
}
private void flushBuffer() throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(out.toByteArray());
HubIOStream.this.write(buffer);
@Override
public void flush() throws IOException {
sendMessage(out.toByteArray());
out.reset();
}
}

View File

@ -26,6 +26,8 @@ import org.openhab.binding.insteon.internal.config.InsteonHub1Configuration;
import org.openhab.binding.insteon.internal.config.InsteonHub2Configuration;
import org.openhab.binding.insteon.internal.config.InsteonPLMConfiguration;
import org.openhab.core.io.transport.serial.SerialPortManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abstract class for implementation for I/O stream with anything that looks
@ -38,6 +40,7 @@ import org.openhab.core.io.transport.serial.SerialPortManager;
*/
@NonNullByDefault
public abstract class IOStream {
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected @Nullable InputStream in;
protected @Nullable OutputStream out;
@ -47,20 +50,17 @@ public abstract class IOStream {
*
* @param b byte array (output)
* @return number of bytes read
* @throws InterruptedException
* @throws IOException
*/
public int read(byte @Nullable [] b) throws InterruptedException, IOException {
int len = 0;
while (len == 0) {
if (!isOpen()) {
throw new IOException("io stream not open");
}
InputStream in = this.in;
if (in != null) {
len = in.read(b);
} else {
if (in == null) {
throw new IOException("input stream not defined");
}
int len = 0;
while (len == 0) {
len = in.read(b);
if (Thread.interrupted()) {
throw new InterruptedException();
@ -77,27 +77,16 @@ public abstract class IOStream {
* Writes data to IOStream
*
* @param b byte array to write
* @throws IOException
*/
public void write(byte @Nullable [] b) throws InterruptedException, IOException {
if (!isOpen()) {
throw new IOException("io stream not open");
}
public void write(byte @Nullable [] b) throws IOException {
OutputStream out = this.out;
if (out != null) {
out.write(b);
} else {
if (out == null) {
throw new IOException("output stream not defined");
}
out.write(b);
}
/**
* Returns if IOStream is open
*
* @return true if stream is open, false if not
*/
public abstract boolean isOpen();
/**
* Opens the IOStream
*
@ -108,7 +97,27 @@ public abstract class IOStream {
/**
* Closes the IOStream
*/
public abstract void close();
public void close() {
InputStream in = this.in;
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.debug("failed to close input stream", e);
}
this.in = null;
}
OutputStream out = this.out;
if (out != null) {
try {
out.close();
} catch (IOException e) {
logger.debug("failed to close output stream", e);
}
this.out = null;
}
}
/**
* Creates an IOStream from an insteon bridge config object

View File

@ -205,9 +205,7 @@ public class LegacyPort {
mdbb.stop();
}
if (ioStream.isOpen()) {
ioStream.close();
}
ScheduledFuture<?> readJob = this.readJob;
if (readJob != null) {

View File

@ -135,9 +135,7 @@ public class Port {
connected.set(false);
if (ioStream.isOpen()) {
ioStream.close();
}
ScheduledFuture<?> readJob = this.readJob;
if (readJob != null) {

View File

@ -13,8 +13,6 @@
package org.openhab.binding.insteon.internal.transport;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
@ -24,8 +22,6 @@ import org.openhab.core.io.transport.serial.SerialPort;
import org.openhab.core.io.transport.serial.SerialPortIdentifier;
import org.openhab.core.io.transport.serial.SerialPortManager;
import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements IOStream for serial devices
@ -37,8 +33,6 @@ import org.slf4j.LoggerFactory;
*/
@NonNullByDefault
public class SerialIOStream extends IOStream {
private final Logger logger = LoggerFactory.getLogger(SerialIOStream.class);
private String name;
private int baudRate;
private SerialPortManager serialPortManager;
@ -50,14 +44,9 @@ public class SerialIOStream extends IOStream {
this.serialPortManager = serialPortManager;
}
@Override
public boolean isOpen() {
return port != null;
}
@Override
public boolean open() {
if (isOpen()) {
if (port != null) {
logger.warn("serial port is already open");
return false;
}
@ -93,25 +82,7 @@ public class SerialIOStream extends IOStream {
@Override
public void close() {
InputStream in = this.in;
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.debug("failed to close input stream", e);
}
this.in = null;
}
OutputStream out = this.out;
if (out != null) {
try {
out.close();
} catch (IOException e) {
logger.debug("failed to close output stream", e);
}
this.out = null;
}
super.close();
SerialPort port = this.port;
if (port != null) {

View File

@ -13,15 +13,11 @@
package org.openhab.binding.insteon.internal.transport;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements IOStream for an Insteon Legacy Hub
@ -33,8 +29,6 @@ import org.slf4j.LoggerFactory;
*/
@NonNullByDefault
public class TcpIOStream extends IOStream {
private final Logger logger = LoggerFactory.getLogger(TcpIOStream.class);
private String host;
private int port;
private @Nullable Socket socket;
@ -50,14 +44,9 @@ public class TcpIOStream extends IOStream {
this.port = port;
}
@Override
public boolean isOpen() {
return socket != null;
}
@Override
public boolean open() {
if (isOpen()) {
if (socket != null) {
logger.warn("socket is already open");
return false;
}
@ -79,25 +68,7 @@ public class TcpIOStream extends IOStream {
@Override
public void close() {
InputStream in = this.in;
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.debug("failed to close input stream", e);
}
this.in = null;
}
OutputStream out = this.out;
if (out != null) {
try {
out.close();
} catch (IOException e) {
logger.debug("failed to close output stream", e);
}
this.out = null;
}
super.close();
Socket socket = this.socket;
if (socket != null) {

View File

@ -78,7 +78,7 @@ public class MsgFactory {
end += l;
// copy the incoming data to the end of the buffer
if (logger.isTraceEnabled()) {
logger.trace("read buffer: len {} data: {}", end, HexUtils.getHexString(buf, end, false));
logger.trace("read buffer: len {} data: {}", end, HexUtils.getHexString(buf, end));
}
}
@ -136,7 +136,7 @@ public class MsgFactory {
done = true;
}
if (logger.isTraceEnabled()) {
logger.trace("keeping buffer len {} data: {}", end, HexUtils.getHexString(buf, end, false));
logger.trace("keeping buffer len {} data: {}", end, HexUtils.getHexString(buf, end));
}
return msg;
}

View File

@ -71,33 +71,29 @@ public class HexUtils {
return s;
}
/**
* Returns a hex string for a given byte array
*
* @param bytes the byte array
* @return the formatted hex string
*/
public static String getHexString(byte[] bytes) {
return getHexString(bytes, bytes.length);
}
/**
* Returns a hex string for a given byte array and length
*
* @param bytes the byte array
* @param len the string length
* @return the formatted hex string
* @throws ArrayIndexOutOfBoundsException
*/
public static String getHexString(byte[] bytes, int len) {
return getHexString(bytes, len, true);
}
/**
* Returns a hex string for a given byte array, length and prefix flag
*
* @param bytes the byte array
* @param len the string length
* @param addPrefix if hex prefix should be added
* @return the formatted hex string
*/
public static String getHexString(byte[] bytes, int len, boolean addPrefix) {
public static String getHexString(byte[] bytes, int len) throws ArrayIndexOutOfBoundsException {
String s = "";
for (int i = 0; i < bytes.length && i < len; i++) {
for (int i = 0; i < len; i++) {
s += String.format("%02X", bytes[i] & 0xFF);
}
if (!s.isEmpty() && addPrefix) {
s = "0x" + s;
}
return s;
}