Reviewing connector logic

Signed-off-by: gael@lhopital.org <gael@lhopital.org>
This commit is contained in:
gael@lhopital.org 2025-01-02 18:05:46 +01:00
parent a2befde47b
commit 7d46776956
2 changed files with 71 additions and 125 deletions

View File

@ -17,12 +17,11 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Random;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.gce.internal.model.M2MMessageParser;
import org.openhab.binding.gce.internal.model.PortDefinition;
import org.openhab.binding.gce.internal.model.StatusFile;
@ -42,43 +41,26 @@ import org.xml.sax.SAXException;
@NonNullByDefault
public class Ipx800DeviceConnector extends Thread {
private static final int DEFAULT_SOCKET_TIMEOUT_MS = 10000;
private static final int DEFAULT_RECONNECT_TIMEOUT_MS = 5000;
private static final int MAX_KEEPALIVE_FAILURE = 3;
private final Logger logger = LoggerFactory.getLogger(Ipx800DeviceConnector.class);
private final Random randomizer = new Random();
private final String hostname;
private final int portNumber;
private final M2MMessageParser parser;
private final StatusFileAccessor statusAccessor;
private final Ipx800EventListener listener;
private @Nullable Socket socket;
private @Nullable BufferedReader input;
private @Nullable PrintWriter output;
private @Nullable InputStreamReader streamReader;
private final Socket socket;
private final BufferedReader input;
private final PrintWriter output;
private int failedKeepalive = 0;
private boolean waitingKeepaliveResponse = false;
private boolean interrupted = false;
public Ipx800DeviceConnector(String hostname, int portNumber, ThingUID uid, Ipx800EventListener listener) {
public Ipx800DeviceConnector(String hostname, int portNumber, ThingUID uid, Ipx800EventListener listener)
throws UnknownHostException, IOException {
super("OH-binding-" + uid);
this.hostname = hostname;
this.portNumber = portNumber;
this.listener = listener;
this.parser = new M2MMessageParser(listener);
this.statusAccessor = new StatusFileAccessor(hostname);
setDaemon(true);
}
/**
* Connect to the ipx800
*
* @throws IOException
*/
private void connect() throws IOException {
disconnect();
logger.debug("Connecting to {}:{}...", hostname, portNumber);
Socket socket = new Socket(hostname, portNumber);
@ -86,126 +68,85 @@ public class Ipx800DeviceConnector extends Thread {
this.socket = socket;
output = new PrintWriter(socket.getOutputStream(), true);
streamReader = new InputStreamReader(socket.getInputStream());
input = new BufferedReader(streamReader);
}
/**
* Disconnect the device
*/
private void disconnect() {
if (socket instanceof Socket client) {
try {
if (output instanceof PrintWriter out) {
out.close();
output = null;
}
if (input instanceof BufferedReader in) {
in.close();
input = null;
}
if (streamReader instanceof InputStreamReader stream) {
stream.close();
streamReader = null;
}
logger.debug("Closing socket");
client.close();
} catch (IOException e) {
logger.warn("Exception closing streams: {}", e.getMessage());
}
socket = null;
}
input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
parser = new M2MMessageParser(listener);
statusAccessor = new StatusFileAccessor(hostname);
setDaemon(true);
}
/**
* Stop the device thread
*/
public void dispose() {
interrupt();
disconnect();
interrupted = true;
}
public synchronized void send(String message) {
if (output instanceof PrintWriter out) {
logger.debug("Sending '{}' to Ipx800", message);
out.println(message);
} else {
logger.warn("Unable to send '{}' when the output stream is closed.", message);
}
logger.debug("Sending '{}' to Ipx800", message);
output.println(message);
}
/**
* Send a random keepalive command which cause the IPX to send an update.
* If we don't receive the update maxKeepAliveFailure time, the connection is closed and reopened
* If we don't receive the update maxKeepAliveFailure time, the connection is closed
*/
private void sendKeepalive() {
if (output instanceof PrintWriter out) {
PortDefinition pd = PortDefinition.values()[randomizer.nextInt(PortDefinition.AS_SET.size())];
String command = "%s%d".formatted(pd.m2mCommand, randomizer.nextInt(pd.quantity) + 1);
PortDefinition pd = PortDefinition.values()[randomizer.nextInt(PortDefinition.AS_SET.size())];
String command = "%s%d".formatted(pd.m2mCommand, randomizer.nextInt(pd.quantity) + 1);
if (waitingKeepaliveResponse) {
failedKeepalive++;
logger.debug("Sending keepalive {}, attempt {}", command, failedKeepalive);
} else {
failedKeepalive = 0;
logger.debug("Sending keepalive {}", command);
}
out.println(command);
parser.setExpectedResponse(command);
waitingKeepaliveResponse = true;
if (waitingKeepaliveResponse) {
failedKeepalive++;
logger.debug("Sending keepalive {}, attempt {}", command, failedKeepalive);
} else {
logger.warn("Unable to send keepAlive when the output stream is closed.");
failedKeepalive = 0;
logger.debug("Sending keepalive {}", command);
}
output.println(command);
parser.setExpectedResponse(command);
waitingKeepaliveResponse = true;
}
@Override
public void run() {
try {
waitingKeepaliveResponse = false;
failedKeepalive = 0;
connect();
while (!interrupted()) {
if (failedKeepalive > MAX_KEEPALIVE_FAILURE) {
throw new IOException("Max keep alive attempts has been reached");
}
if (input instanceof BufferedReader in) {
try {
String command = in.readLine();
waitingKeepaliveResponse = false;
parser.unsolicitedUpdate(command);
} catch (IOException e) {
handleException(e);
}
}
while (!interrupted) {
if (failedKeepalive > MAX_KEEPALIVE_FAILURE) {
interrupted = true;
listener.errorOccurred(new IOException("Max keep alive attempts has been reached"));
}
disconnect();
} catch (IOException e) {
handleException(e);
}
try {
Thread.sleep(DEFAULT_RECONNECT_TIMEOUT_MS);
} catch (InterruptedException e) {
dispose();
}
}
private void handleException(Exception e) {
if (!interrupted()) {
if (e instanceof SocketTimeoutException) {
try {
String command = input.readLine();
waitingKeepaliveResponse = false;
parser.unsolicitedUpdate(command);
} catch (SocketTimeoutException e) {
sendKeepalive();
return;
} else if (e instanceof SocketException) {
logger.debug("SocketException raised by streams while closing socket");
} else if (e instanceof IOException) {
logger.warn("Communication error: '{}'. Will retry in {} ms", e, DEFAULT_RECONNECT_TIMEOUT_MS);
} catch (IOException e) {
interrupted = true;
listener.errorOccurred(e);
}
listener.errorOccurred(e);
}
if (output instanceof PrintWriter out) {
out.close();
}
if (input instanceof BufferedReader in) {
try {
in.close();
} catch (IOException e) {
logger.warn("Exception input stream: {}", e.getMessage());
}
}
if (socket instanceof Socket client) {
try {
logger.debug("Closing socket");
client.close();
} catch (IOException e) {
logger.warn("Exception closing socket: {}", e.getMessage());
}
}
}
public StatusFile readStatusFile() throws SAXException, IOException {

View File

@ -15,6 +15,7 @@ package org.openhab.binding.gce.internal.handler;
import static org.openhab.binding.gce.internal.GCEBindingConstants.*;
import java.io.IOException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
@ -87,12 +88,16 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList
Ipx800Configuration config = getConfigAs(Ipx800Configuration.class);
deviceConnector = new Ipx800DeviceConnector(config.hostname, config.portNumber, getThing().getUID(), this);
updateStatus(ThingStatus.UNKNOWN);
jobs.add(scheduler.scheduleWithFixedDelay(this::readStatusFile, 1500, config.pullInterval,
TimeUnit.MILLISECONDS));
try {
deviceConnector = new Ipx800DeviceConnector(config.hostname, config.portNumber, getThing().getUID(), this);
updateStatus(ThingStatus.UNKNOWN);
jobs.add(scheduler.scheduleWithFixedDelay(this::readStatusFile, 1500, config.pullInterval,
TimeUnit.MILLISECONDS));
} catch (UnknownHostException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
} catch (IOException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
}
}
private void readStatusFile() {