[insteon] Refactor iostream transport classes (#17930)

Signed-off-by: jsetton <jeremy.setton@gmail.com>
This commit is contained in:
Jeremy 2024-12-21 18:47:36 -05:00 committed by GitHub
parent 6acfeb65f3
commit 1b75e03ca8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 92 additions and 188 deletions

View File

@ -16,7 +16,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Base64; import java.util.Base64;
import java.util.Objects; import java.util.Objects;
@ -34,8 +33,6 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.openhab.binding.insteon.internal.utils.HexUtils; import org.openhab.binding.insteon.internal.utils.HexUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Implements IOStream for an Insteon Hub 2 * Implements IOStream for an Insteon Hub 2
@ -47,10 +44,8 @@ import org.slf4j.LoggerFactory;
*/ */
@NonNullByDefault @NonNullByDefault
public class HubIOStream extends IOStream { public class HubIOStream extends IOStream {
private final Logger logger = LoggerFactory.getLogger(HubIOStream.class); private static final String BUFFER_TAG_START = "<BS>";
private static final String BUFFER_TAG_END = "</BS>";
private static final String BS_START = "<BS>";
private static final String BS_END = "</BS>";
private static final int REQUEST_TIMEOUT = 30; // in seconds private static final int REQUEST_TIMEOUT = 30; // in seconds
private String host; private String host;
@ -61,7 +56,7 @@ public class HubIOStream extends IOStream {
private ScheduledExecutorService scheduler; private ScheduledExecutorService scheduler;
private @Nullable ScheduledFuture<?> job; private @Nullable ScheduledFuture<?> job;
// index of the last byte we have read in the buffer // index of the last byte we have read in the buffer
private int bufferIdx = -1; private volatile int bufferIdx = -1;
/** /**
* Constructor * Constructor
@ -84,14 +79,9 @@ public class HubIOStream extends IOStream {
this.scheduler = scheduler; this.scheduler = scheduler;
} }
@Override
public boolean isOpen() {
return job != null;
}
@Override @Override
public boolean open() { public boolean open() {
if (isOpen()) { if (job != null) {
logger.warn("hub stream is already open"); logger.warn("hub stream is already open");
return false; return false;
} }
@ -120,93 +110,65 @@ public class HubIOStream extends IOStream {
@Override @Override
public void close() { public void close() {
super.close();
ScheduledFuture<?> job = this.job; ScheduledFuture<?> job = this.job;
if (job != null) { if (job != null) {
job.cancel(true); job.cancel(true);
this.job = null; this.job = null;
} }
InputStream in = this.in;
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.debug("failed to close input stream", e);
}
this.in = null;
}
OutputStream out = this.out;
if (out != null) {
try {
out.close();
} catch (IOException e) {
logger.debug("failed to close output stream", e);
}
this.out = null;
}
} }
/** /**
* Fetches the latest status buffer from the Hub * Returns the latest buffer from the Hub
* *
* @return string with status buffer * @return the buffer string
* @throws IOException * @throws IOException
*/ */
private synchronized String bufferStatus() throws IOException { private String getBuffer() throws IOException {
String result = getURL("/buffstatus.xml"); String result = getURL("/buffstatus.xml");
int start = result.indexOf(BS_START); int start = result.indexOf(BUFFER_TAG_START);
if (start == -1) { int end = result.indexOf(BUFFER_TAG_END, start);
throw new IOException("malformed bufferstatus.xml"); if (start == -1 || end == -1) {
} throw new IOException("malformed buffstatus.xml");
start += BS_START.length();
int end = result.indexOf(BS_END, start);
if (end == -1) {
throw new IOException("malformed bufferstatus.xml");
} }
start += BUFFER_TAG_START.length();
return result.substring(start, end).trim(); return result.substring(start, end).trim();
} }
/** /**
* Sends command to Hub to clear the status buffer * Clears the Hub buffer
* *
* @throws IOException * @throws IOException
*/ */
private synchronized void clearBuffer() throws IOException { private void clearBuffer() throws IOException {
logger.trace("clearing buffer"); logger.trace("clearing buffer");
getURL("/1?XB=M=1"); getURL("/1?XB=M=1");
bufferIdx = 0; bufferIdx = 0;
} }
/** /**
* Sends Insteon message (byte array) as a readable ascii string to the Hub * Sends a message to the Hub
* *
* @param msg byte array representing the Insteon message * @param b byte array representing the Insteon message
* @throws IOException in case of I/O error * @throws IOException
*/ */
public synchronized void write(ByteBuffer msg) throws IOException { private void sendMessage(byte[] b) throws IOException {
poll(); // fetch the status buffer before we send out commands poll(); // poll the status buffer before we send the message
logger.trace("sending a message");
StringBuilder b = new StringBuilder(); getURL("/3?" + HexUtils.getHexString(b) + "=I=3");
while (msg.remaining() > 0) {
b.append(String.format("%02x", msg.get()));
}
String hexMsg = b.toString();
logger.trace("writing a message");
getURL("/3?" + hexMsg + "=I=3");
bufferIdx = 0; bufferIdx = 0;
} }
/** /**
* Polls the Hub web interface to fetch the status buffer * Polls the Hub buffer and add to input stream
* *
* @throws IOException if something goes wrong with I/O * @throws IOException
*/ */
private synchronized void poll() throws IOException { private void poll() throws IOException {
String buffer = bufferStatus(); // fetch via http call String buffer = getBuffer();
logger.trace("poll: {}", buffer); logger.trace("poll: {}", buffer);
// The Hub maintains a ring buffer where the last two digits (in hex!) represent // The Hub maintains a ring buffer where the last two digits (in hex!) represent
// the position of the last byte read. // the position of the last byte read.
@ -249,10 +211,9 @@ public class HubIOStream extends IOStream {
logger.trace("no wrap: appending new data: {}", msg); logger.trace("no wrap: appending new data: {}", msg);
} }
if (msg.length() != 0) { if (msg.length() != 0) {
byte[] array = HexUtils.toByteArray(msg.toString()); byte[] b = HexUtils.toByteArray(msg.toString());
ByteBuffer buf = ByteBuffer.wrap(array);
if (in instanceof HubInputStream hubInput) { if (in instanceof HubInputStream hubInput) {
hubInput.handle(buf); hubInput.add(b);
} else { } else {
logger.debug("hub input stream is null"); logger.debug("hub input stream is null");
} }
@ -310,10 +271,10 @@ public class HubIOStream extends IOStream {
// A buffer to keep bytes while we are waiting for the inputstream to read // A buffer to keep bytes while we are waiting for the inputstream to read
private ReadByteBuffer buffer = new ReadByteBuffer(1024); private ReadByteBuffer buffer = new ReadByteBuffer(1024);
public void handle(ByteBuffer b) throws IOException { public void add(byte[] b) throws IOException {
// Make sure we cleanup as much space as possible // Make sure we cleanup as much space as possible
buffer.makeCompact(); buffer.makeCompact();
buffer.add(b.array()); buffer.add(b);
} }
@Override @Override
@ -342,18 +303,18 @@ public class HubIOStream extends IOStream {
@Override @Override
public void write(int b) throws IOException { public void write(int b) throws IOException {
out.write(b); out.write(b);
flushBuffer(); flush();
} }
@Override @Override
public void write(byte @Nullable [] b, int off, int len) throws IOException { public void write(byte @Nullable [] b, int off, int len) throws IOException {
out.write(b, off, len); out.write(b, off, len);
flushBuffer(); flush();
} }
private void flushBuffer() throws IOException { @Override
ByteBuffer buffer = ByteBuffer.wrap(out.toByteArray()); public void flush() throws IOException {
HubIOStream.this.write(buffer); sendMessage(out.toByteArray());
out.reset(); out.reset();
} }
} }

View File

@ -26,6 +26,8 @@ import org.openhab.binding.insteon.internal.config.InsteonHub1Configuration;
import org.openhab.binding.insteon.internal.config.InsteonHub2Configuration; import org.openhab.binding.insteon.internal.config.InsteonHub2Configuration;
import org.openhab.binding.insteon.internal.config.InsteonPLMConfiguration; import org.openhab.binding.insteon.internal.config.InsteonPLMConfiguration;
import org.openhab.core.io.transport.serial.SerialPortManager; import org.openhab.core.io.transport.serial.SerialPortManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Abstract class for implementation for I/O stream with anything that looks * Abstract class for implementation for I/O stream with anything that looks
@ -38,6 +40,7 @@ import org.openhab.core.io.transport.serial.SerialPortManager;
*/ */
@NonNullByDefault @NonNullByDefault
public abstract class IOStream { public abstract class IOStream {
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected @Nullable InputStream in; protected @Nullable InputStream in;
protected @Nullable OutputStream out; protected @Nullable OutputStream out;
@ -47,20 +50,17 @@ public abstract class IOStream {
* *
* @param b byte array (output) * @param b byte array (output)
* @return number of bytes read * @return number of bytes read
* @throws InterruptedException
* @throws IOException
*/ */
public int read(byte @Nullable [] b) throws InterruptedException, IOException { public int read(byte @Nullable [] b) throws InterruptedException, IOException {
InputStream in = this.in;
if (in == null) {
throw new IOException("input stream not defined");
}
int len = 0; int len = 0;
while (len == 0) { while (len == 0) {
if (!isOpen()) { len = in.read(b);
throw new IOException("io stream not open");
}
InputStream in = this.in;
if (in != null) {
len = in.read(b);
} else {
throw new IOException("input stream not defined");
}
if (Thread.interrupted()) { if (Thread.interrupted()) {
throw new InterruptedException(); throw new InterruptedException();
@ -77,27 +77,16 @@ public abstract class IOStream {
* Writes data to IOStream * Writes data to IOStream
* *
* @param b byte array to write * @param b byte array to write
* @throws IOException
*/ */
public void write(byte @Nullable [] b) throws InterruptedException, IOException { public void write(byte @Nullable [] b) throws IOException {
if (!isOpen()) {
throw new IOException("io stream not open");
}
OutputStream out = this.out; OutputStream out = this.out;
if (out != null) { if (out == null) {
out.write(b);
} else {
throw new IOException("output stream not defined"); throw new IOException("output stream not defined");
} }
out.write(b);
} }
/**
* Returns if IOStream is open
*
* @return true if stream is open, false if not
*/
public abstract boolean isOpen();
/** /**
* Opens the IOStream * Opens the IOStream
* *
@ -108,7 +97,27 @@ public abstract class IOStream {
/** /**
* Closes the IOStream * Closes the IOStream
*/ */
public abstract void close(); public void close() {
InputStream in = this.in;
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.debug("failed to close input stream", e);
}
this.in = null;
}
OutputStream out = this.out;
if (out != null) {
try {
out.close();
} catch (IOException e) {
logger.debug("failed to close output stream", e);
}
this.out = null;
}
}
/** /**
* Creates an IOStream from an insteon bridge config object * Creates an IOStream from an insteon bridge config object

View File

@ -205,9 +205,7 @@ public class LegacyPort {
mdbb.stop(); mdbb.stop();
} }
if (ioStream.isOpen()) { ioStream.close();
ioStream.close();
}
ScheduledFuture<?> readJob = this.readJob; ScheduledFuture<?> readJob = this.readJob;
if (readJob != null) { if (readJob != null) {

View File

@ -135,9 +135,7 @@ public class Port {
connected.set(false); connected.set(false);
if (ioStream.isOpen()) { ioStream.close();
ioStream.close();
}
ScheduledFuture<?> readJob = this.readJob; ScheduledFuture<?> readJob = this.readJob;
if (readJob != null) { if (readJob != null) {

View File

@ -13,8 +13,6 @@
package org.openhab.binding.insteon.internal.transport; package org.openhab.binding.insteon.internal.transport;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
@ -24,8 +22,6 @@ import org.openhab.core.io.transport.serial.SerialPort;
import org.openhab.core.io.transport.serial.SerialPortIdentifier; import org.openhab.core.io.transport.serial.SerialPortIdentifier;
import org.openhab.core.io.transport.serial.SerialPortManager; import org.openhab.core.io.transport.serial.SerialPortManager;
import org.openhab.core.io.transport.serial.UnsupportedCommOperationException; import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Implements IOStream for serial devices * Implements IOStream for serial devices
@ -37,8 +33,6 @@ import org.slf4j.LoggerFactory;
*/ */
@NonNullByDefault @NonNullByDefault
public class SerialIOStream extends IOStream { public class SerialIOStream extends IOStream {
private final Logger logger = LoggerFactory.getLogger(SerialIOStream.class);
private String name; private String name;
private int baudRate; private int baudRate;
private SerialPortManager serialPortManager; private SerialPortManager serialPortManager;
@ -50,14 +44,9 @@ public class SerialIOStream extends IOStream {
this.serialPortManager = serialPortManager; this.serialPortManager = serialPortManager;
} }
@Override
public boolean isOpen() {
return port != null;
}
@Override @Override
public boolean open() { public boolean open() {
if (isOpen()) { if (port != null) {
logger.warn("serial port is already open"); logger.warn("serial port is already open");
return false; return false;
} }
@ -93,25 +82,7 @@ public class SerialIOStream extends IOStream {
@Override @Override
public void close() { public void close() {
InputStream in = this.in; super.close();
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.debug("failed to close input stream", e);
}
this.in = null;
}
OutputStream out = this.out;
if (out != null) {
try {
out.close();
} catch (IOException e) {
logger.debug("failed to close output stream", e);
}
this.out = null;
}
SerialPort port = this.port; SerialPort port = this.port;
if (port != null) { if (port != null) {

View File

@ -13,15 +13,11 @@
package org.openhab.binding.insteon.internal.transport; package org.openhab.binding.insteon.internal.transport;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Implements IOStream for an Insteon Legacy Hub * Implements IOStream for an Insteon Legacy Hub
@ -33,8 +29,6 @@ import org.slf4j.LoggerFactory;
*/ */
@NonNullByDefault @NonNullByDefault
public class TcpIOStream extends IOStream { public class TcpIOStream extends IOStream {
private final Logger logger = LoggerFactory.getLogger(TcpIOStream.class);
private String host; private String host;
private int port; private int port;
private @Nullable Socket socket; private @Nullable Socket socket;
@ -50,14 +44,9 @@ public class TcpIOStream extends IOStream {
this.port = port; this.port = port;
} }
@Override
public boolean isOpen() {
return socket != null;
}
@Override @Override
public boolean open() { public boolean open() {
if (isOpen()) { if (socket != null) {
logger.warn("socket is already open"); logger.warn("socket is already open");
return false; return false;
} }
@ -79,25 +68,7 @@ public class TcpIOStream extends IOStream {
@Override @Override
public void close() { public void close() {
InputStream in = this.in; super.close();
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.debug("failed to close input stream", e);
}
this.in = null;
}
OutputStream out = this.out;
if (out != null) {
try {
out.close();
} catch (IOException e) {
logger.debug("failed to close output stream", e);
}
this.out = null;
}
Socket socket = this.socket; Socket socket = this.socket;
if (socket != null) { if (socket != null) {

View File

@ -78,7 +78,7 @@ public class MsgFactory {
end += l; end += l;
// copy the incoming data to the end of the buffer // copy the incoming data to the end of the buffer
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("read buffer: len {} data: {}", end, HexUtils.getHexString(buf, end, false)); logger.trace("read buffer: len {} data: {}", end, HexUtils.getHexString(buf, end));
} }
} }
@ -136,7 +136,7 @@ public class MsgFactory {
done = true; done = true;
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("keeping buffer len {} data: {}", end, HexUtils.getHexString(buf, end, false)); logger.trace("keeping buffer len {} data: {}", end, HexUtils.getHexString(buf, end));
} }
return msg; return msg;
} }

View File

@ -71,33 +71,29 @@ public class HexUtils {
return s; return s;
} }
/**
* Returns a hex string for a given byte array
*
* @param bytes the byte array
* @return the formatted hex string
*/
public static String getHexString(byte[] bytes) {
return getHexString(bytes, bytes.length);
}
/** /**
* Returns a hex string for a given byte array and length * Returns a hex string for a given byte array and length
* *
* @param bytes the byte array * @param bytes the byte array
* @param len the string length * @param len the string length
* @return the formatted hex string * @return the formatted hex string
* @throws ArrayIndexOutOfBoundsException
*/ */
public static String getHexString(byte[] bytes, int len) { public static String getHexString(byte[] bytes, int len) throws ArrayIndexOutOfBoundsException {
return getHexString(bytes, len, true);
}
/**
* Returns a hex string for a given byte array, length and prefix flag
*
* @param bytes the byte array
* @param len the string length
* @param addPrefix if hex prefix should be added
* @return the formatted hex string
*/
public static String getHexString(byte[] bytes, int len, boolean addPrefix) {
String s = ""; String s = "";
for (int i = 0; i < bytes.length && i < len; i++) { for (int i = 0; i < len; i++) {
s += String.format("%02X", bytes[i] & 0xFF); s += String.format("%02X", bytes[i] & 0xFF);
} }
if (!s.isEmpty() && addPrefix) {
s = "0x" + s;
}
return s; return s;
} }