diff --git a/bundles/org.openhab.binding.gce/src/main/java/org/openhab/binding/gce/internal/handler/Ipx800DeviceConnector.java b/bundles/org.openhab.binding.gce/src/main/java/org/openhab/binding/gce/internal/handler/Ipx800DeviceConnector.java index f2130f99b6f..f2114a13c4f 100644 --- a/bundles/org.openhab.binding.gce/src/main/java/org/openhab/binding/gce/internal/handler/Ipx800DeviceConnector.java +++ b/bundles/org.openhab.binding.gce/src/main/java/org/openhab/binding/gce/internal/handler/Ipx800DeviceConnector.java @@ -19,10 +19,10 @@ import java.io.PrintWriter; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; -import java.util.Optional; import java.util.Random; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; import org.openhab.binding.gce.internal.model.M2MMessageParser; import org.openhab.binding.gce.internal.model.PortDefinition; import org.openhab.binding.gce.internal.model.StatusFile; @@ -54,9 +54,10 @@ public class Ipx800DeviceConnector extends Thread { private final StatusFileAccessor statusAccessor; private final Ipx800EventListener listener; - private Optional socket = Optional.empty(); - private Optional input = Optional.empty(); - private Optional output = Optional.empty(); + private @Nullable Socket socket; + private @Nullable BufferedReader input; + private @Nullable PrintWriter output; + private @Nullable InputStreamReader streamReader; private int failedKeepalive = 0; private boolean waitingKeepaliveResponse = false; @@ -82,27 +83,41 @@ public class Ipx800DeviceConnector extends Thread { logger.debug("Connecting to {}:{}...", hostname, portNumber); Socket socket = new Socket(hostname, portNumber); socket.setSoTimeout(DEFAULT_SOCKET_TIMEOUT_MS); - // socket.getInputStream().skip(socket.getInputStream().available()); - this.socket = Optional.of(socket); + this.socket = socket; - output = Optional.of(new PrintWriter(socket.getOutputStream(), true)); - input = Optional.of(new BufferedReader(new InputStreamReader(socket.getInputStream()))); + output = new PrintWriter(socket.getOutputStream(), true); + streamReader = new InputStreamReader(socket.getInputStream()); + input = new BufferedReader(streamReader); } /** * Disconnect the device */ private void disconnect() { - socket.ifPresent(client -> { + if (socket instanceof Socket client) { try { + if (output instanceof PrintWriter out) { + out.close(); + output = null; + } + + if (input instanceof BufferedReader in) { + in.close(); + input = null; + } + + if (streamReader instanceof InputStreamReader stream) { + stream.close(); + streamReader = null; + } + logger.debug("Closing socket"); client.close(); - } catch (IOException ignore) { + } catch (IOException e) { + logger.warn("Exception closing streams: {}", e.getMessage()); } - socket = Optional.empty(); - input = Optional.empty(); - output = Optional.empty(); - }); + socket = null; + } } /** @@ -114,18 +129,20 @@ public class Ipx800DeviceConnector extends Thread { } public synchronized void send(String message) { - output.ifPresentOrElse(out -> { + if (output instanceof PrintWriter out) { logger.debug("Sending '{}' to Ipx800", message); out.println(message); - }, () -> logger.warn("Unable to send '{}' when the output stream is closed.", message)); + } else { + logger.warn("Unable to send '{}' when the output stream is closed.", message); + } } /** - * Send an arbitrary keepalive command which cause the IPX to send an update. + * Send a random keepalive command which cause the IPX to send an update. * If we don't receive the update maxKeepAliveFailure time, the connection is closed and reopened */ private void sendKeepalive() { - output.ifPresentOrElse(out -> { + if (output instanceof PrintWriter out) { PortDefinition pd = PortDefinition.values()[randomizer.nextInt(PortDefinition.AS_SET.size())]; String command = "%s%d".formatted(pd.m2mCommand, randomizer.nextInt(pd.quantity) + 1); @@ -141,7 +158,9 @@ public class Ipx800DeviceConnector extends Thread { parser.setExpectedResponse(command); waitingKeepaliveResponse = true; - }, () -> logger.warn("Unable to send keepAlive when the output stream is closed.")); + } else { + logger.warn("Unable to send keepAlive when the output stream is closed."); + } } @Override @@ -154,7 +173,7 @@ public class Ipx800DeviceConnector extends Thread { if (failedKeepalive > MAX_KEEPALIVE_FAILURE) { throw new IOException("Max keep alive attempts has been reached"); } - input.ifPresent(in -> { + if (input instanceof BufferedReader in) { try { String command = in.readLine(); waitingKeepaliveResponse = false; @@ -162,7 +181,7 @@ public class Ipx800DeviceConnector extends Thread { } catch (IOException e) { handleException(e); } - }); + } } disconnect(); } catch (IOException e) { diff --git a/bundles/org.openhab.binding.gce/src/main/java/org/openhab/binding/gce/internal/handler/Ipx800v3Handler.java b/bundles/org.openhab.binding.gce/src/main/java/org/openhab/binding/gce/internal/handler/Ipx800v3Handler.java index c909d55a2f6..a250813a36f 100644 --- a/bundles/org.openhab.binding.gce/src/main/java/org/openhab/binding/gce/internal/handler/Ipx800v3Handler.java +++ b/bundles/org.openhab.binding.gce/src/main/java/org/openhab/binding/gce/internal/handler/Ipx800v3Handler.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -73,30 +72,10 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList private static final double ANALOG_SAMPLING = 0.000050354; private final Logger logger = LoggerFactory.getLogger(Ipx800v3Handler.class); - private final Map portDatas = new HashMap<>(); + private final Map portDatas = new HashMap<>(); private @Nullable Ipx800DeviceConnector deviceConnector; - private Optional> refreshJob = Optional.empty(); - - private class LongPressEvaluator implements Runnable { - private final Instant referenceTime; - private final String port; - private final String eventChannelId; - - public LongPressEvaluator(Channel channel, String port, PortData portData) { - this.referenceTime = portData.getTimestamp(); - this.port = port; - this.eventChannelId = "%s-%s".formatted(channel.getUID().getId(), TRIGGER_CONTACT); - } - - @Override - public void run() { - if (portDatas.get(port) instanceof PortData currentData && currentData.getValue() == 1 - && referenceTime.equals(currentData.getTimestamp())) { - triggerChannel(eventChannelId, EVENT_LONG_PRESS); - } - } - } + private List> jobs = new ArrayList<>(); public Ipx800v3Handler(Thing thing) { super(thing); @@ -112,7 +91,7 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList updateStatus(ThingStatus.UNKNOWN); - refreshJob = Optional.of(scheduler.scheduleWithFixedDelay(this::readStatusFile, 1500, config.pullInterval, + jobs.add(scheduler.scheduleWithFixedDelay(this::readStatusFile, 1500, config.pullInterval, TimeUnit.MILLISECONDS)); } @@ -144,7 +123,7 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList int nbElements = status != null ? status.getPorts(portDefinition).size() : portDefinition.quantity; for (int i = 0; i < nbElements; i++) { ChannelUID portChannelUID = createChannels(portDefinition, i, channels); - portDatas.put(portChannelUID.getId(), new PortData()); + portDatas.put(portChannelUID, new PortData()); } }); updateThing(editThing().withChannels(channels).build()); @@ -162,8 +141,8 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList @Override public void dispose() { - refreshJob.ifPresent(job -> job.cancel(true)); - refreshJob = Optional.empty(); + jobs.forEach(job -> job.cancel(true)); + jobs.clear(); if (deviceConnector instanceof Ipx800DeviceConnector connector) { connector.dispose(); @@ -248,10 +227,11 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList public void dataReceived(String port, double value) { updateStatus(ThingStatus.ONLINE); if (thing.getChannel(PortDefinition.asChannelId(port)) instanceof Channel channel) { - String channelId = channel.getUID().getId(); + ChannelUID channelUID = channel.getUID(); + String channelId = channelUID.getId(); - if (portDatas.get(channelId) instanceof PortData portData - && channel.getUID().getGroupId() instanceof String groupId) { + if (portDatas.get(channelUID) instanceof PortData portData + && channelUID.getGroupId() instanceof String groupId) { Instant now = Instant.now(); Configuration configuration = channel.getConfiguration(); PortDefinition portDefinition = PortDefinition.fromGroupId(groupId); @@ -275,8 +255,12 @@ public class Ipx800v3Handler extends BaseThingHandler implements Ipx800EventList if (value == 1) { // CLOSED if (config.longPressTime != 0 && portData.isInitialized()) { - scheduler.schedule(new LongPressEvaluator(channel, port, portData), - config.longPressTime, TimeUnit.MILLISECONDS); + jobs.add(scheduler.schedule(() -> { + if (portData.getValue() == 1 && now.equals(portData.getTimestamp())) { + String eventChannelId = "%s-%s".formatted(channelUID.getId(), TRIGGER_CONTACT); + triggerChannel(eventChannelId, EVENT_LONG_PRESS); + } + }, config.longPressTime, TimeUnit.MILLISECONDS)); } else if (config.pulsePeriod != 0) { portData.setPulsing(scheduler.scheduleWithFixedDelay(() -> { triggerPushButtonChannel(channel, EVENT_PULSE);