Trying again to clean everything

Signed-off-by: clinique <gael@lhopital.org>
This commit is contained in:
clinique 2025-01-01 01:44:56 +01:00 committed by gael@lhopital.org
parent b51b0e2a0e
commit a2befde47b
2 changed files with 56 additions and 53 deletions

View File

@ -19,10 +19,10 @@ import java.io.PrintWriter;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Optional;
import java.util.Random;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.gce.internal.model.M2MMessageParser;
import org.openhab.binding.gce.internal.model.PortDefinition;
import org.openhab.binding.gce.internal.model.StatusFile;
@ -54,9 +54,10 @@ public class Ipx800DeviceConnector extends Thread {
private final StatusFileAccessor statusAccessor;
private final Ipx800EventListener listener;
private Optional<Socket> socket = Optional.empty();
private Optional<BufferedReader> input = Optional.empty();
private Optional<PrintWriter> output = Optional.empty();
private @Nullable Socket socket;
private @Nullable BufferedReader input;
private @Nullable PrintWriter output;
private @Nullable InputStreamReader streamReader;
private int failedKeepalive = 0;
private boolean waitingKeepaliveResponse = false;
@ -82,27 +83,41 @@ public class Ipx800DeviceConnector extends Thread {
logger.debug("Connecting to {}:{}...", hostname, portNumber);
Socket socket = new Socket(hostname, portNumber);
socket.setSoTimeout(DEFAULT_SOCKET_TIMEOUT_MS);
// socket.getInputStream().skip(socket.getInputStream().available());
this.socket = Optional.of(socket);
this.socket = socket;
output = Optional.of(new PrintWriter(socket.getOutputStream(), true));
input = Optional.of(new BufferedReader(new InputStreamReader(socket.getInputStream())));
output = new PrintWriter(socket.getOutputStream(), true);
streamReader = new InputStreamReader(socket.getInputStream());
input = new BufferedReader(streamReader);
}
/**
* Disconnect the device
*/
private void disconnect() {
socket.ifPresent(client -> {
if (socket instanceof Socket client) {
try {
if (output instanceof PrintWriter out) {
out.close();
output = null;
}
if (input instanceof BufferedReader in) {
in.close();
input = null;
}
if (streamReader instanceof InputStreamReader stream) {
stream.close();
streamReader = null;
}
logger.debug("Closing socket");
client.close();
} catch (IOException ignore) {
} catch (IOException e) {
logger.warn("Exception closing streams: {}", e.getMessage());
}
socket = Optional.empty();
input = Optional.empty();
output = Optional.empty();
});
socket = null;
}
}
/**
@ -114,18 +129,20 @@ public class Ipx800DeviceConnector extends Thread {
}
public synchronized void send(String message) {
output.ifPresentOrElse(out -> {
if (output instanceof PrintWriter out) {
logger.debug("Sending '{}' to Ipx800", message);
out.println(message);
}, () -> logger.warn("Unable to send '{}' when the output stream is closed.", message));
} else {
logger.warn("Unable to send '{}' when the output stream is closed.", message);
}
}
/**
* Send an arbitrary keepalive command which cause the IPX to send an update.
* Send a random keepalive command which cause the IPX to send an update.
* If we don't receive the update maxKeepAliveFailure time, the connection is closed and reopened
*/
private void sendKeepalive() {
output.ifPresentOrElse(out -> {
if (output instanceof PrintWriter out) {
PortDefinition pd = PortDefinition.values()[randomizer.nextInt(PortDefinition.AS_SET.size())];
String command = "%s%d".formatted(pd.m2mCommand, randomizer.nextInt(pd.quantity) + 1);
@ -141,7 +158,9 @@ public class Ipx800DeviceConnector extends Thread {
parser.setExpectedResponse(command);
waitingKeepaliveResponse = true;
}, () -> logger.warn("Unable to send keepAlive when the output stream is closed."));
} else {
logger.warn("Unable to send keepAlive when the output stream is closed.");
}
}
@Override
@ -154,7 +173,7 @@ public class Ipx800DeviceConnector extends Thread {
if (failedKeepalive > MAX_KEEPALIVE_FAILURE) {
throw new IOException("Max keep alive attempts has been reached");
}
input.ifPresent(in -> {
if (input instanceof BufferedReader in) {
try {
String command = in.readLine();
waitingKeepaliveResponse = false;
@ -162,7 +181,7 @@ public class Ipx800DeviceConnector extends Thread {
} catch (IOException e) {
handleException(e);
}
});
}
}
disconnect();
} catch (IOException e) {

View File

@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -73,30 +72,10 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList
private static final double ANALOG_SAMPLING = 0.000050354;
private final Logger logger = LoggerFactory.getLogger(Ipx800v3Handler.class);
private final Map<String, PortData> portDatas = new HashMap<>();
private final Map<ChannelUID, PortData> portDatas = new HashMap<>();
private @Nullable Ipx800DeviceConnector deviceConnector;
private Optional<ScheduledFuture<?>> refreshJob = Optional.empty();
private class LongPressEvaluator implements Runnable {
private final Instant referenceTime;
private final String port;
private final String eventChannelId;
public LongPressEvaluator(Channel channel, String port, PortData portData) {
this.referenceTime = portData.getTimestamp();
this.port = port;
this.eventChannelId = "%s-%s".formatted(channel.getUID().getId(), TRIGGER_CONTACT);
}
@Override
public void run() {
if (portDatas.get(port) instanceof PortData currentData && currentData.getValue() == 1
&& referenceTime.equals(currentData.getTimestamp())) {
triggerChannel(eventChannelId, EVENT_LONG_PRESS);
}
}
}
private List<ScheduledFuture<?>> jobs = new ArrayList<>();
public Ipx800v3Handler(Thing thing) {
super(thing);
@ -112,7 +91,7 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList
updateStatus(ThingStatus.UNKNOWN);
refreshJob = Optional.of(scheduler.scheduleWithFixedDelay(this::readStatusFile, 1500, config.pullInterval,
jobs.add(scheduler.scheduleWithFixedDelay(this::readStatusFile, 1500, config.pullInterval,
TimeUnit.MILLISECONDS));
}
@ -144,7 +123,7 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList
int nbElements = status != null ? status.getPorts(portDefinition).size() : portDefinition.quantity;
for (int i = 0; i < nbElements; i++) {
ChannelUID portChannelUID = createChannels(portDefinition, i, channels);
portDatas.put(portChannelUID.getId(), new PortData());
portDatas.put(portChannelUID, new PortData());
}
});
updateThing(editThing().withChannels(channels).build());
@ -162,8 +141,8 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList
@Override
public void dispose() {
refreshJob.ifPresent(job -> job.cancel(true));
refreshJob = Optional.empty();
jobs.forEach(job -> job.cancel(true));
jobs.clear();
if (deviceConnector instanceof Ipx800DeviceConnector connector) {
connector.dispose();
@ -248,10 +227,11 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList
public void dataReceived(String port, double value) {
updateStatus(ThingStatus.ONLINE);
if (thing.getChannel(PortDefinition.asChannelId(port)) instanceof Channel channel) {
String channelId = channel.getUID().getId();
ChannelUID channelUID = channel.getUID();
String channelId = channelUID.getId();
if (portDatas.get(channelId) instanceof PortData portData
&& channel.getUID().getGroupId() instanceof String groupId) {
if (portDatas.get(channelUID) instanceof PortData portData
&& channelUID.getGroupId() instanceof String groupId) {
Instant now = Instant.now();
Configuration configuration = channel.getConfiguration();
PortDefinition portDefinition = PortDefinition.fromGroupId(groupId);
@ -275,8 +255,12 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList
if (value == 1) { // CLOSED
if (config.longPressTime != 0 && portData.isInitialized()) {
scheduler.schedule(new LongPressEvaluator(channel, port, portData),
config.longPressTime, TimeUnit.MILLISECONDS);
jobs.add(scheduler.schedule(() -> {
if (portData.getValue() == 1 && now.equals(portData.getTimestamp())) {
String eventChannelId = "%s-%s".formatted(channelUID.getId(), TRIGGER_CONTACT);
triggerChannel(eventChannelId, EVENT_LONG_PRESS);
}
}, config.longPressTime, TimeUnit.MILLISECONDS));
} else if (config.pulsePeriod != 0) {
portData.setPulsing(scheduler.scheduleWithFixedDelay(() -> {
triggerPushButtonChannel(channel, EVENT_PULSE);