[neohub] Avoid too frequent requests to hub (#15743)

* [neohub] throttle requests to hub
* [neohub] handle websocket error; and attempt restart

---------

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
This commit is contained in:
Andrew Fiddian-Green 2023-12-10 10:20:54 +00:00 committed by GitHub
parent c3660e2414
commit 7da863aa49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 22 deletions

View File

@ -15,6 +15,7 @@ package org.openhab.binding.neohub.internal;
import static org.openhab.binding.neohub.internal.NeoHubBindingConstants.*;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
@ -61,6 +62,8 @@ public class NeoHubHandler extends BaseBridgeHandler {
private static final String SEE_README = "See documentation chapter \"Connection Refused Errors\"";
private static final int MAX_FAILED_SEND_ATTEMPTS = 2;
private static final Duration MIN_RESTART_DELAY = Duration.ofSeconds(10);
private static final Duration MAX_RESTART_DELAY = Duration.ofHours(1);
private final Logger logger = LoggerFactory.getLogger(NeoHubHandler.class);
@ -91,6 +94,8 @@ public class NeoHubHandler extends BaseBridgeHandler {
private ApiVersion apiVersion = ApiVersion.LEGACY;
private boolean isApiOnline = false;
private int failedSendAttempts = 0;
private Duration restartDelay = Duration.from(MIN_RESTART_DELAY);
private @Nullable ScheduledFuture<?> restartTask;
public NeoHubHandler(Bridge bridge, WebSocketFactory webSocketFactory) {
super(bridge);
@ -148,21 +153,12 @@ public class NeoHubHandler extends BaseBridgeHandler {
logger.debug("hub '{}' preferLegacyApi={}", getThing().getUID(), config.preferLegacyApi);
}
// create a web or TCP socket based on the port number in the configuration
NeoHubSocketBase socket;
try {
if (config.useWebSocket) {
socket = new NeoHubWebSocket(config, webSocketFactory, thing.getUID());
} else {
socket = new NeoHubSocket(config, thing.getUID().getAsString());
}
} catch (IOException e) {
logger.debug("\"hub '{}' error creating web/tcp socket: '{}'", getThing().getUID(), e.getMessage());
this.config = config;
NeoHubSocketBase socket = createSocket();
if (socket == null) {
return;
}
this.socket = socket;
this.config = config;
/*
* Try to 'ping' the hub, and if there is a 'connection refused', it is probably due to the mobile App |
@ -206,10 +202,39 @@ public class NeoHubHandler extends BaseBridgeHandler {
startFastPollingBurst();
}
/**
* Create a web or TCP socket based on the configuration setting
*/
private @Nullable NeoHubSocketBase createSocket() {
NeoHubConfiguration config = this.config;
if (config == null) {
logger.debug("\"hub '{}' configuration is null", getThing().getUID());
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
} else {
try {
if (config.useWebSocket) {
return new NeoHubWebSocket(config, webSocketFactory, thing.getUID());
} else {
return new NeoHubSocket(config, thing.getUID().getAsString());
}
} catch (IOException e) {
logger.debug("\"hub '{}' error creating web/tcp socket: '{}'", getThing().getUID(), e.getMessage());
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
}
}
return null;
}
@Override
public void dispose() {
if (logger.isDebugEnabled()) {
logger.debug("hub '{}' stop background polling..", getThing().getUID());
logger.debug("hub '{}' shutting down..", getThing().getUID());
}
closeSocket();
ScheduledFuture<?> restartTask = this.restartTask;
if (restartTask != null) {
restartTask.cancel(true);
}
// clean up the lazy polling scheduler
@ -225,14 +250,16 @@ public class NeoHubHandler extends BaseBridgeHandler {
fast.cancel(true);
this.fastPollingScheduler = null;
}
}
private void closeSocket() {
NeoHubSocketBase socket = this.socket;
this.socket = null;
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
}
this.socket = null;
}
}
@ -276,8 +303,7 @@ public class NeoHubHandler extends BaseBridgeHandler {
protected @Nullable NeoHubAbstractDeviceData fromNeoHubGetDeviceData() {
NeoHubSocketBase socket = this.socket;
if (socket == null || config == null) {
logger.warn(MSG_HUB_CONFIG, getThing().getUID());
if (socket == null) {
return null;
}
@ -309,6 +335,7 @@ public class NeoHubHandler extends BaseBridgeHandler {
if (getThing().getStatus() != ThingStatus.ONLINE) {
updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
restartDelay = Duration.from(MIN_RESTART_DELAY);
}
// check if we also need to discard and update systemData
@ -340,8 +367,24 @@ public class NeoHubHandler extends BaseBridgeHandler {
} catch (IOException | NeoHubException e) {
logger.warn(MSG_FMT_DEVICE_POLL_ERR, getThing().getUID(), e.getMessage());
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
return null;
scheduleRestart();
}
return null;
}
private synchronized void scheduleRestart() {
closeSocket();
restartTask = scheduler.schedule(() -> {
NeoHubSocketBase socket = createSocket();
this.socket = socket;
if (!Thread.interrupted() && socket == null) { // keep trying..
restartDelay = restartDelay.plus(restartDelay);
if (restartDelay.compareTo(MAX_RESTART_DELAY) > 0) {
restartDelay = Duration.from(MAX_RESTART_DELAY);
}
scheduleRestart();
}
}, restartDelay.toSeconds(), TimeUnit.SECONDS);
}
/**

View File

@ -47,6 +47,7 @@ public class NeoHubSocket extends NeoHubSocketBase {
IOException caughtException = null;
StringBuilder builder = new StringBuilder();
throttle();
try (Socket socket = new Socket()) {
int port = config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_TCP;
socket.connect(new InetSocketAddress(config.hostName, port), config.socketTimeout * 1000);

View File

@ -14,6 +14,9 @@ package org.openhab.binding.neohub.internal;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import org.eclipse.jdt.annotation.NonNullByDefault;
@ -29,6 +32,9 @@ public abstract class NeoHubSocketBase implements Closeable {
protected final NeoHubConfiguration config;
protected final String hubId;
private static final int REQUEST_INTERVAL_MILLISECS = 1000;
private Optional<Instant> lastRequestTime = Optional.empty();
public NeoHubSocketBase(NeoHubConfiguration config, String hubId) {
this.config = config;
this.hubId = hubId;
@ -43,4 +49,24 @@ public abstract class NeoHubSocketBase implements Closeable {
* @throws NeoHubException if the communication returned a response but the response was not valid JSON
*/
public abstract String sendMessage(final String requestJson) throws IOException, NeoHubException;
/**
* Method for throttling requests to prevent overloading the hub.
* <p>
* The NeoHub can get confused if, while it is uploading data to the cloud, it also receives too many local
* requests, so this method throttles the requests to one per REQUEST_INTERVAL_MILLISECS maximum.
*
* @throws NeoHubException if the wait is interrupted
*/
protected synchronized void throttle() throws NeoHubException {
try {
Instant now = Instant.now();
long delay = lastRequestTime
.map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS)).orElse(0L);
lastRequestTime = Optional.of(now.plusMillis(delay));
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new NeoHubException("Throttle sleep interrupted", e);
}
}
}

View File

@ -15,6 +15,7 @@ package org.openhab.binding.neohub.internal;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import org.eclipse.jdt.annotation.NonNullByDefault;
@ -116,9 +117,9 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
*/
private void closeSession() {
Session session = this.session;
this.session = null;
if (session != null) {
session.close();
this.session = null;
}
}
@ -172,19 +173,19 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
responsePending = true;
IOException caughtException = null;
throttle();
try {
// send the request
logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length());
session.getRemote().sendString(requestOuter);
logger.trace("hub '{}' sent:{}", hubId, requestOuter);
// sleep and loop until we get a response or the socket is closed
int sleepRemainingMilliseconds = config.socketTimeout * 1000;
// sleep and loop until we get a response, the socket is closed, or it times out
Instant timeout = Instant.now().plusSeconds(config.socketTimeout);
while (responsePending) {
try {
Thread.sleep(SLEEP_MILLISECONDS);
sleepRemainingMilliseconds = sleepRemainingMilliseconds - SLEEP_MILLISECONDS;
if (sleepRemainingMilliseconds <= 0) {
if (Instant.now().isAfter(timeout)) {
throw new IOException("Read timed out");
}
} catch (InterruptedException e) {
@ -195,6 +196,9 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
caughtException = e;
}
caughtException = caughtException != null ? caughtException
: this.session == null ? new IOException("WebSocket session closed") : null;
logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length());
logger.trace("hub '{}' received:{}", hubId, responseOuter);