From 7da863aa49f0556fdd1b74d5912093fe4d02b47c Mon Sep 17 00:00:00 2001 From: Andrew Fiddian-Green Date: Sun, 10 Dec 2023 10:20:54 +0000 Subject: [PATCH] [neohub] Avoid too frequent requests to hub (#15743) * [neohub] throttle requests to hub * [neohub] handle websocket error; and attempt restart --------- Signed-off-by: Andrew Fiddian-Green --- .../neohub/internal/NeoHubHandler.java | 77 +++++++++++++++---- .../binding/neohub/internal/NeoHubSocket.java | 1 + .../neohub/internal/NeoHubSocketBase.java | 26 +++++++ .../neohub/internal/NeoHubWebSocket.java | 14 ++-- 4 files changed, 96 insertions(+), 22 deletions(-) diff --git a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubHandler.java b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubHandler.java index 2e9e42722f2..3659ffc72f3 100644 --- a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubHandler.java +++ b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubHandler.java @@ -15,6 +15,7 @@ package org.openhab.binding.neohub.internal; import static org.openhab.binding.neohub.internal.NeoHubBindingConstants.*; import java.io.IOException; +import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.HashMap; @@ -61,6 +62,8 @@ public class NeoHubHandler extends BaseBridgeHandler { private static final String SEE_README = "See documentation chapter \"Connection Refused Errors\""; private static final int MAX_FAILED_SEND_ATTEMPTS = 2; + private static final Duration MIN_RESTART_DELAY = Duration.ofSeconds(10); + private static final Duration MAX_RESTART_DELAY = Duration.ofHours(1); private final Logger logger = LoggerFactory.getLogger(NeoHubHandler.class); @@ -91,6 +94,8 @@ public class NeoHubHandler extends BaseBridgeHandler { private ApiVersion apiVersion = ApiVersion.LEGACY; private boolean isApiOnline = false; private int failedSendAttempts = 0; + private Duration restartDelay = Duration.from(MIN_RESTART_DELAY); + private @Nullable ScheduledFuture restartTask; public NeoHubHandler(Bridge bridge, WebSocketFactory webSocketFactory) { super(bridge); @@ -148,21 +153,12 @@ public class NeoHubHandler extends BaseBridgeHandler { logger.debug("hub '{}' preferLegacyApi={}", getThing().getUID(), config.preferLegacyApi); } - // create a web or TCP socket based on the port number in the configuration - NeoHubSocketBase socket; - try { - if (config.useWebSocket) { - socket = new NeoHubWebSocket(config, webSocketFactory, thing.getUID()); - } else { - socket = new NeoHubSocket(config, thing.getUID().getAsString()); - } - } catch (IOException e) { - logger.debug("\"hub '{}' error creating web/tcp socket: '{}'", getThing().getUID(), e.getMessage()); + this.config = config; + NeoHubSocketBase socket = createSocket(); + if (socket == null) { return; } - this.socket = socket; - this.config = config; /* * Try to 'ping' the hub, and if there is a 'connection refused', it is probably due to the mobile App | @@ -206,10 +202,39 @@ public class NeoHubHandler extends BaseBridgeHandler { startFastPollingBurst(); } + /** + * Create a web or TCP socket based on the configuration setting + */ + private @Nullable NeoHubSocketBase createSocket() { + NeoHubConfiguration config = this.config; + if (config == null) { + logger.debug("\"hub '{}' configuration is null", getThing().getUID()); + updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR); + } else { + try { + if (config.useWebSocket) { + return new NeoHubWebSocket(config, webSocketFactory, thing.getUID()); + } else { + return new NeoHubSocket(config, thing.getUID().getAsString()); + } + } catch (IOException e) { + logger.debug("\"hub '{}' error creating web/tcp socket: '{}'", getThing().getUID(), e.getMessage()); + updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR); + } + } + return null; + } + @Override public void dispose() { if (logger.isDebugEnabled()) { - logger.debug("hub '{}' stop background polling..", getThing().getUID()); + logger.debug("hub '{}' shutting down..", getThing().getUID()); + } + + closeSocket(); + ScheduledFuture restartTask = this.restartTask; + if (restartTask != null) { + restartTask.cancel(true); } // clean up the lazy polling scheduler @@ -225,14 +250,16 @@ public class NeoHubHandler extends BaseBridgeHandler { fast.cancel(true); this.fastPollingScheduler = null; } + } + private void closeSocket() { NeoHubSocketBase socket = this.socket; + this.socket = null; if (socket != null) { try { socket.close(); } catch (IOException e) { } - this.socket = null; } } @@ -276,8 +303,7 @@ public class NeoHubHandler extends BaseBridgeHandler { protected @Nullable NeoHubAbstractDeviceData fromNeoHubGetDeviceData() { NeoHubSocketBase socket = this.socket; - if (socket == null || config == null) { - logger.warn(MSG_HUB_CONFIG, getThing().getUID()); + if (socket == null) { return null; } @@ -309,6 +335,7 @@ public class NeoHubHandler extends BaseBridgeHandler { if (getThing().getStatus() != ThingStatus.ONLINE) { updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE); + restartDelay = Duration.from(MIN_RESTART_DELAY); } // check if we also need to discard and update systemData @@ -340,8 +367,24 @@ public class NeoHubHandler extends BaseBridgeHandler { } catch (IOException | NeoHubException e) { logger.warn(MSG_FMT_DEVICE_POLL_ERR, getThing().getUID(), e.getMessage()); updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR); - return null; + scheduleRestart(); } + return null; + } + + private synchronized void scheduleRestart() { + closeSocket(); + restartTask = scheduler.schedule(() -> { + NeoHubSocketBase socket = createSocket(); + this.socket = socket; + if (!Thread.interrupted() && socket == null) { // keep trying.. + restartDelay = restartDelay.plus(restartDelay); + if (restartDelay.compareTo(MAX_RESTART_DELAY) > 0) { + restartDelay = Duration.from(MAX_RESTART_DELAY); + } + scheduleRestart(); + } + }, restartDelay.toSeconds(), TimeUnit.SECONDS); } /** diff --git a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocket.java b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocket.java index 20a40816c05..b538430c356 100644 --- a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocket.java +++ b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocket.java @@ -47,6 +47,7 @@ public class NeoHubSocket extends NeoHubSocketBase { IOException caughtException = null; StringBuilder builder = new StringBuilder(); + throttle(); try (Socket socket = new Socket()) { int port = config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_TCP; socket.connect(new InetSocketAddress(config.hostName, port), config.socketTimeout * 1000); diff --git a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocketBase.java b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocketBase.java index 1394effe526..ad3e7b7eb1a 100644 --- a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocketBase.java +++ b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocketBase.java @@ -14,6 +14,9 @@ package org.openhab.binding.neohub.internal; import java.io.Closeable; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -29,6 +32,9 @@ public abstract class NeoHubSocketBase implements Closeable { protected final NeoHubConfiguration config; protected final String hubId; + private static final int REQUEST_INTERVAL_MILLISECS = 1000; + private Optional lastRequestTime = Optional.empty(); + public NeoHubSocketBase(NeoHubConfiguration config, String hubId) { this.config = config; this.hubId = hubId; @@ -43,4 +49,24 @@ public abstract class NeoHubSocketBase implements Closeable { * @throws NeoHubException if the communication returned a response but the response was not valid JSON */ public abstract String sendMessage(final String requestJson) throws IOException, NeoHubException; + + /** + * Method for throttling requests to prevent overloading the hub. + *

+ * The NeoHub can get confused if, while it is uploading data to the cloud, it also receives too many local + * requests, so this method throttles the requests to one per REQUEST_INTERVAL_MILLISECS maximum. + * + * @throws NeoHubException if the wait is interrupted + */ + protected synchronized void throttle() throws NeoHubException { + try { + Instant now = Instant.now(); + long delay = lastRequestTime + .map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS)).orElse(0L); + lastRequestTime = Optional.of(now.plusMillis(delay)); + Thread.sleep(delay); + } catch (InterruptedException e) { + throw new NeoHubException("Throttle sleep interrupted", e); + } + } } diff --git a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubWebSocket.java b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubWebSocket.java index 7d24d9b8078..64c08958427 100644 --- a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubWebSocket.java +++ b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubWebSocket.java @@ -15,6 +15,7 @@ package org.openhab.binding.neohub.internal; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.time.Instant; import java.util.concurrent.ExecutionException; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -116,9 +117,9 @@ public class NeoHubWebSocket extends NeoHubSocketBase { */ private void closeSession() { Session session = this.session; + this.session = null; if (session != null) { session.close(); - this.session = null; } } @@ -172,19 +173,19 @@ public class NeoHubWebSocket extends NeoHubSocketBase { responsePending = true; IOException caughtException = null; + throttle(); try { // send the request logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length()); session.getRemote().sendString(requestOuter); logger.trace("hub '{}' sent:{}", hubId, requestOuter); - // sleep and loop until we get a response or the socket is closed - int sleepRemainingMilliseconds = config.socketTimeout * 1000; + // sleep and loop until we get a response, the socket is closed, or it times out + Instant timeout = Instant.now().plusSeconds(config.socketTimeout); while (responsePending) { try { Thread.sleep(SLEEP_MILLISECONDS); - sleepRemainingMilliseconds = sleepRemainingMilliseconds - SLEEP_MILLISECONDS; - if (sleepRemainingMilliseconds <= 0) { + if (Instant.now().isAfter(timeout)) { throw new IOException("Read timed out"); } } catch (InterruptedException e) { @@ -195,6 +196,9 @@ public class NeoHubWebSocket extends NeoHubSocketBase { caughtException = e; } + caughtException = caughtException != null ? caughtException + : this.session == null ? new IOException("WebSocket session closed") : null; + logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length()); logger.trace("hub '{}' received:{}", hubId, responseOuter);