mirror of
https://github.com/openhab/openhab-addons.git
synced 2025-01-10 07:02:02 +01:00
Reviewing connector logic
Signed-off-by: gael@lhopital.org <gael@lhopital.org>
This commit is contained in:
parent
7dd809b8ab
commit
a3d3feec7f
@ -17,12 +17,11 @@ import java.io.IOException;
|
|||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.SocketException;
|
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.eclipse.jdt.annotation.Nullable;
|
|
||||||
import org.openhab.binding.gce.internal.model.M2MMessageParser;
|
import org.openhab.binding.gce.internal.model.M2MMessageParser;
|
||||||
import org.openhab.binding.gce.internal.model.PortDefinition;
|
import org.openhab.binding.gce.internal.model.PortDefinition;
|
||||||
import org.openhab.binding.gce.internal.model.StatusFile;
|
import org.openhab.binding.gce.internal.model.StatusFile;
|
||||||
@ -42,43 +41,26 @@ import org.xml.sax.SAXException;
|
|||||||
@NonNullByDefault
|
@NonNullByDefault
|
||||||
public class Ipx800DeviceConnector extends Thread {
|
public class Ipx800DeviceConnector extends Thread {
|
||||||
private static final int DEFAULT_SOCKET_TIMEOUT_MS = 10000;
|
private static final int DEFAULT_SOCKET_TIMEOUT_MS = 10000;
|
||||||
private static final int DEFAULT_RECONNECT_TIMEOUT_MS = 5000;
|
|
||||||
private static final int MAX_KEEPALIVE_FAILURE = 3;
|
private static final int MAX_KEEPALIVE_FAILURE = 3;
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(Ipx800DeviceConnector.class);
|
private final Logger logger = LoggerFactory.getLogger(Ipx800DeviceConnector.class);
|
||||||
private final Random randomizer = new Random();
|
private final Random randomizer = new Random();
|
||||||
|
|
||||||
private final String hostname;
|
|
||||||
private final int portNumber;
|
|
||||||
private final M2MMessageParser parser;
|
private final M2MMessageParser parser;
|
||||||
private final StatusFileAccessor statusAccessor;
|
private final StatusFileAccessor statusAccessor;
|
||||||
private final Ipx800EventListener listener;
|
private final Ipx800EventListener listener;
|
||||||
|
private final Socket socket;
|
||||||
private @Nullable Socket socket;
|
private final BufferedReader input;
|
||||||
private @Nullable BufferedReader input;
|
private final PrintWriter output;
|
||||||
private @Nullable PrintWriter output;
|
|
||||||
private @Nullable InputStreamReader streamReader;
|
|
||||||
|
|
||||||
private int failedKeepalive = 0;
|
private int failedKeepalive = 0;
|
||||||
private boolean waitingKeepaliveResponse = false;
|
private boolean waitingKeepaliveResponse = false;
|
||||||
|
private boolean interrupted = false;
|
||||||
|
|
||||||
public Ipx800DeviceConnector(String hostname, int portNumber, ThingUID uid, Ipx800EventListener listener) {
|
public Ipx800DeviceConnector(String hostname, int portNumber, ThingUID uid, Ipx800EventListener listener)
|
||||||
|
throws UnknownHostException, IOException {
|
||||||
super("OH-binding-" + uid);
|
super("OH-binding-" + uid);
|
||||||
this.hostname = hostname;
|
|
||||||
this.portNumber = portNumber;
|
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
this.parser = new M2MMessageParser(listener);
|
|
||||||
this.statusAccessor = new StatusFileAccessor(hostname);
|
|
||||||
setDaemon(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Connect to the ipx800
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void connect() throws IOException {
|
|
||||||
disconnect();
|
|
||||||
|
|
||||||
logger.debug("Connecting to {}:{}...", hostname, portNumber);
|
logger.debug("Connecting to {}:{}...", hostname, portNumber);
|
||||||
Socket socket = new Socket(hostname, portNumber);
|
Socket socket = new Socket(hostname, portNumber);
|
||||||
@ -86,126 +68,85 @@ public class Ipx800DeviceConnector extends Thread {
|
|||||||
this.socket = socket;
|
this.socket = socket;
|
||||||
|
|
||||||
output = new PrintWriter(socket.getOutputStream(), true);
|
output = new PrintWriter(socket.getOutputStream(), true);
|
||||||
streamReader = new InputStreamReader(socket.getInputStream());
|
input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
|
||||||
input = new BufferedReader(streamReader);
|
parser = new M2MMessageParser(listener);
|
||||||
}
|
statusAccessor = new StatusFileAccessor(hostname);
|
||||||
|
setDaemon(true);
|
||||||
/**
|
|
||||||
* Disconnect the device
|
|
||||||
*/
|
|
||||||
private void disconnect() {
|
|
||||||
if (socket instanceof Socket client) {
|
|
||||||
try {
|
|
||||||
if (output instanceof PrintWriter out) {
|
|
||||||
out.close();
|
|
||||||
output = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (input instanceof BufferedReader in) {
|
|
||||||
in.close();
|
|
||||||
input = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamReader instanceof InputStreamReader stream) {
|
|
||||||
stream.close();
|
|
||||||
streamReader = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug("Closing socket");
|
|
||||||
client.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.warn("Exception closing streams: {}", e.getMessage());
|
|
||||||
}
|
|
||||||
socket = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the device thread
|
* Stop the device thread
|
||||||
*/
|
*/
|
||||||
public void dispose() {
|
public void dispose() {
|
||||||
interrupt();
|
interrupted = true;
|
||||||
disconnect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void send(String message) {
|
public synchronized void send(String message) {
|
||||||
if (output instanceof PrintWriter out) {
|
logger.debug("Sending '{}' to Ipx800", message);
|
||||||
logger.debug("Sending '{}' to Ipx800", message);
|
output.println(message);
|
||||||
out.println(message);
|
|
||||||
} else {
|
|
||||||
logger.warn("Unable to send '{}' when the output stream is closed.", message);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a random keepalive command which cause the IPX to send an update.
|
* Send a random keepalive command which cause the IPX to send an update.
|
||||||
* If we don't receive the update maxKeepAliveFailure time, the connection is closed and reopened
|
* If we don't receive the update maxKeepAliveFailure time, the connection is closed
|
||||||
*/
|
*/
|
||||||
private void sendKeepalive() {
|
private void sendKeepalive() {
|
||||||
if (output instanceof PrintWriter out) {
|
PortDefinition pd = PortDefinition.values()[randomizer.nextInt(PortDefinition.AS_SET.size())];
|
||||||
PortDefinition pd = PortDefinition.values()[randomizer.nextInt(PortDefinition.AS_SET.size())];
|
String command = "%s%d".formatted(pd.m2mCommand, randomizer.nextInt(pd.quantity) + 1);
|
||||||
String command = "%s%d".formatted(pd.m2mCommand, randomizer.nextInt(pd.quantity) + 1);
|
|
||||||
|
|
||||||
if (waitingKeepaliveResponse) {
|
if (waitingKeepaliveResponse) {
|
||||||
failedKeepalive++;
|
failedKeepalive++;
|
||||||
logger.debug("Sending keepalive {}, attempt {}", command, failedKeepalive);
|
logger.debug("Sending keepalive {}, attempt {}", command, failedKeepalive);
|
||||||
} else {
|
|
||||||
failedKeepalive = 0;
|
|
||||||
logger.debug("Sending keepalive {}", command);
|
|
||||||
}
|
|
||||||
|
|
||||||
out.println(command);
|
|
||||||
parser.setExpectedResponse(command);
|
|
||||||
|
|
||||||
waitingKeepaliveResponse = true;
|
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Unable to send keepAlive when the output stream is closed.");
|
failedKeepalive = 0;
|
||||||
|
logger.debug("Sending keepalive {}", command);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
output.println(command);
|
||||||
|
parser.setExpectedResponse(command);
|
||||||
|
|
||||||
|
waitingKeepaliveResponse = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
while (!interrupted) {
|
||||||
waitingKeepaliveResponse = false;
|
if (failedKeepalive > MAX_KEEPALIVE_FAILURE) {
|
||||||
failedKeepalive = 0;
|
interrupted = true;
|
||||||
connect();
|
listener.errorOccurred(new IOException("Max keep alive attempts has been reached"));
|
||||||
while (!interrupted()) {
|
|
||||||
if (failedKeepalive > MAX_KEEPALIVE_FAILURE) {
|
|
||||||
throw new IOException("Max keep alive attempts has been reached");
|
|
||||||
}
|
|
||||||
if (input instanceof BufferedReader in) {
|
|
||||||
try {
|
|
||||||
String command = in.readLine();
|
|
||||||
waitingKeepaliveResponse = false;
|
|
||||||
parser.unsolicitedUpdate(command);
|
|
||||||
} catch (IOException e) {
|
|
||||||
handleException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
disconnect();
|
try {
|
||||||
} catch (IOException e) {
|
String command = input.readLine();
|
||||||
handleException(e);
|
waitingKeepaliveResponse = false;
|
||||||
}
|
parser.unsolicitedUpdate(command);
|
||||||
try {
|
} catch (SocketTimeoutException e) {
|
||||||
Thread.sleep(DEFAULT_RECONNECT_TIMEOUT_MS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
dispose();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleException(Exception e) {
|
|
||||||
if (!interrupted()) {
|
|
||||||
if (e instanceof SocketTimeoutException) {
|
|
||||||
sendKeepalive();
|
sendKeepalive();
|
||||||
return;
|
} catch (IOException e) {
|
||||||
} else if (e instanceof SocketException) {
|
interrupted = true;
|
||||||
logger.debug("SocketException raised by streams while closing socket");
|
listener.errorOccurred(e);
|
||||||
} else if (e instanceof IOException) {
|
|
||||||
logger.warn("Communication error: '{}'. Will retry in {} ms", e, DEFAULT_RECONNECT_TIMEOUT_MS);
|
|
||||||
}
|
}
|
||||||
listener.errorOccurred(e);
|
|
||||||
}
|
}
|
||||||
|
if (output instanceof PrintWriter out) {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (input instanceof BufferedReader in) {
|
||||||
|
try {
|
||||||
|
in.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn("Exception input stream: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (socket instanceof Socket client) {
|
||||||
|
try {
|
||||||
|
logger.debug("Closing socket");
|
||||||
|
client.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn("Exception closing socket: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public StatusFile readStatusFile() throws SAXException, IOException {
|
public StatusFile readStatusFile() throws SAXException, IOException {
|
||||||
|
@ -15,6 +15,7 @@ package org.openhab.binding.gce.internal.handler;
|
|||||||
import static org.openhab.binding.gce.internal.GCEBindingConstants.*;
|
import static org.openhab.binding.gce.internal.GCEBindingConstants.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
@ -87,12 +88,16 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList
|
|||||||
|
|
||||||
Ipx800Configuration config = getConfigAs(Ipx800Configuration.class);
|
Ipx800Configuration config = getConfigAs(Ipx800Configuration.class);
|
||||||
|
|
||||||
deviceConnector = new Ipx800DeviceConnector(config.hostname, config.portNumber, getThing().getUID(), this);
|
try {
|
||||||
|
deviceConnector = new Ipx800DeviceConnector(config.hostname, config.portNumber, getThing().getUID(), this);
|
||||||
updateStatus(ThingStatus.UNKNOWN);
|
updateStatus(ThingStatus.UNKNOWN);
|
||||||
|
jobs.add(scheduler.scheduleWithFixedDelay(this::readStatusFile, 1500, config.pullInterval,
|
||||||
jobs.add(scheduler.scheduleWithFixedDelay(this::readStatusFile, 1500, config.pullInterval,
|
TimeUnit.MILLISECONDS));
|
||||||
TimeUnit.MILLISECONDS));
|
} catch (UnknownHostException e) {
|
||||||
|
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
|
||||||
|
} catch (IOException e) {
|
||||||
|
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void readStatusFile() {
|
private void readStatusFile() {
|
||||||
|
Loading…
Reference in New Issue
Block a user