[neohub] Improved Web-Socket Communications (#16312)

* [neohub] Improved WebSocket Communications
* [neohub] session recycled only by handler; not by socket class

---------

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
Signed-off-by: Ciprian Pascu <contact@ciprianpascu.ro>
This commit is contained in:
Andrew Fiddian-Green 2024-03-15 12:45:57 +00:00 committed by Ciprian Pascu
parent 0f2acfd28c
commit ff0e2293aa
4 changed files with 180 additions and 103 deletions

View File

@ -47,8 +47,7 @@ public class NeoHubSocket extends NeoHubSocketBase {
IOException caughtException = null;
StringBuilder builder = new StringBuilder();
throttle();
try (Socket socket = new Socket()) {
try (Socket socket = new Socket(); Throttler throttler = new Throttler();) {
int port = config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_TCP;
socket.connect(new InetSocketAddress(config.hostName, port), config.socketTimeout * 1000);
socket.setSoTimeout(config.socketTimeout * 1000);
@ -76,6 +75,8 @@ public class NeoHubSocket extends NeoHubSocketBase {
} catch (IOException e) {
// catch IOExceptions here, and save them to be re-thrown later
caughtException = e;
} catch (InterruptedException e) {
caughtException = new IOException(e);
}
String responseJson = builder.toString().strip();

View File

@ -17,6 +17,8 @@ import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jdt.annotation.NonNullByDefault;
@ -33,6 +35,7 @@ public abstract class NeoHubSocketBase implements Closeable {
protected final String hubId;
private static final int REQUEST_INTERVAL_MILLISECS = 1000;
private final Lock lock = new ReentrantLock(true);
private Optional<Instant> lastRequestTime = Optional.empty();
public NeoHubSocketBase(NeoHubConfiguration config, String hubId) {
@ -51,22 +54,31 @@ public abstract class NeoHubSocketBase implements Closeable {
public abstract String sendMessage(final String requestJson) throws IOException, NeoHubException;
/**
* Method for throttling requests to prevent overloading the hub.
* Class for throttling requests to prevent overloading the hub.
* <p>
* The NeoHub can get confused if, while it is uploading data to the cloud, it also receives too many local
* requests, so this method throttles the requests to one per REQUEST_INTERVAL_MILLISECS maximum.
*
* @throws NeoHubException if the wait is interrupted
* @throws InterruptedException if the wait is interrupted
*/
protected synchronized void throttle() throws NeoHubException {
try {
Instant now = Instant.now();
long delay = lastRequestTime
.map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS)).orElse(0L);
lastRequestTime = Optional.of(now.plusMillis(delay));
protected class Throttler implements AutoCloseable {
public Throttler() throws InterruptedException {
lock.lock();
long delay;
synchronized (NeoHubSocketBase.this) {
Instant now = Instant.now();
delay = lastRequestTime
.map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS))
.orElse(0L);
lastRequestTime = Optional.of(now.plusMillis(delay));
}
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new NeoHubException("Throttle sleep interrupted", e);
}
@Override
public void close() {
lock.unlock();
}
}
}

View File

@ -0,0 +1,45 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.neohub.internal;
import javax.net.ssl.X509ExtendedTrustManager;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.io.net.http.TlsTrustManagerProvider;
import org.openhab.core.io.net.http.TrustAllTrustManager;
/**
* A {@link TlsTrustManagerProvider} implementation to validate the NeoHub web socket self signed certificate.
*
* @author Andrew Fiddian-Green - Initial contribution
*/
@NonNullByDefault
public class NeoHubTlsTrustManagerProvider implements TlsTrustManagerProvider {
private final String fullHostName;
public NeoHubTlsTrustManagerProvider(NeoHubConfiguration config) {
fullHostName = String.format("%s:%d", config.hostName,
config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_WSS);
}
@Override
public String getHostName() {
return fullHostName;
}
@Override
public X509ExtendedTrustManager getTrustManager() {
return TrustAllTrustManager.getInstance();
}
}

View File

@ -16,11 +16,12 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@ -28,9 +29,12 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.openhab.core.io.net.http.TlsTrustManagerProvider;
import org.openhab.core.io.net.http.WebSocketFactory;
import org.openhab.core.thing.ThingUID;
import org.openhab.core.thing.util.ThingWebClientUtil;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,10 +59,12 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
private final Logger logger = LoggerFactory.getLogger(NeoHubWebSocket.class);
private final Gson gson = new Gson();
private final WebSocketClient webSocketClient;
private final ServiceRegistration<?> trustManagerRegistration;
private @Nullable Session session = null;
private String responseOuter = "";
private boolean responsePending;
private @Nullable IOException websocketException = null;
private List<String> responses = new CopyOnWriteArrayList<>();
private boolean closing;
/**
* DTO to receive and parse the response JSON.
@ -78,16 +84,21 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
throws IOException {
super(config, bridgeUID.getAsString());
SslContextFactory.Client sslContextFactory = new SslContextFactory.Client();
sslContextFactory.setTrustAll(true);
String name = ThingWebClientUtil.buildWebClientConsumerName(bridgeUID, null);
webSocketClient = webSocketFactory.createWebSocketClient(name, sslContextFactory);
webSocketClient = webSocketFactory.createWebSocketClient(name);
webSocketClient.setConnectTimeout(config.socketTimeout * 1000);
try {
webSocketClient.start();
} catch (Exception e) {
throw new IOException("Error starting Web Socket client", e);
}
NeoHubTlsTrustManagerProvider trustManagerProvider = new NeoHubTlsTrustManagerProvider(config);
try {
trustManagerRegistration = FrameworkUtil.getBundle(getClass()).getBundleContext()
.registerService(TlsTrustManagerProvider.class.getName(), trustManagerProvider, null);
} catch (IllegalStateException e) {
throw new IOException("Error registering trust manager", e);
}
}
/**
@ -95,14 +106,13 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
*
* @throws IOException if unable to open the web socket
*/
private void startSession() throws IOException {
private synchronized void startSession() throws IOException {
Session session = this.session;
if (session == null || !session.isOpen()) {
closeSession();
try {
int port = config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_WSS;
URI uri = new URI(String.format("wss://%s:%d", config.hostName, port));
webSocketClient.connect(this, uri).get();
this.session = webSocketClient.connect(this, uri).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Error starting session", e);
@ -112,17 +122,6 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
}
}
/**
* Close the web socket session.
*/
private void closeSession() {
Session session = this.session;
this.session = null;
if (session != null) {
session.close();
}
}
/**
* Helper to escape the quote marks in a JSON string.
*
@ -154,85 +153,101 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
}
@Override
public synchronized String sendMessage(final String requestJson) throws IOException, NeoHubException {
// start the session
startSession();
// session start failed
Session session = this.session;
if (session == null) {
throw new IOException("Session is null");
public String sendMessage(final String requestJson) throws IOException, NeoHubException {
if (!closing && websocketException != null) {
throw websocketException;
}
// wrap the inner request in an outer request string
String requestOuter = String.format(REQUEST_OUTER,
jsonEscape(String.format(REQUEST_INNER, config.apiToken, jsonReplaceQuotes(requestJson))));
try (Throttler throttler = new Throttler()) {
// start the session
startSession();
// initialise the response
responseOuter = "";
responsePending = true;
// session start failed
Session session = this.session;
if (session == null) {
throw new IOException("Session is null");
}
IOException caughtException = null;
throttle();
try {
// send the request
logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length());
session.getRemote().sendString(requestOuter);
logger.trace("hub '{}' sent:{}", hubId, requestOuter);
// wrap the inner request in an outer request string
String requestOuter = String.format(REQUEST_OUTER,
jsonEscape(String.format(REQUEST_INNER, config.apiToken, jsonReplaceQuotes(requestJson))));
// sleep and loop until we get a response, the socket is closed, or it times out
Instant timeout = Instant.now().plusSeconds(config.socketTimeout);
while (responsePending) {
try {
Thread.sleep(SLEEP_MILLISECONDS);
if (Instant.now().isAfter(timeout)) {
throw new IOException("Read timed out");
IOException caughtException = null;
try {
// send the request
logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length());
session.getRemote().sendString(requestOuter);
logger.trace("hub '{}' sent:{}", hubId, requestOuter);
// sleep and loop until we get a response, the socket is closed, or it times out
Instant timeout = Instant.now().plusSeconds(config.socketTimeout);
while (!closing && websocketException == null && responses.isEmpty()) {
try {
Thread.sleep(SLEEP_MILLISECONDS);
if (Instant.now().isAfter(timeout)) {
throw new IOException("Read timed out");
}
} catch (InterruptedException e) {
throw new IOException("Read interrupted", e);
}
} catch (InterruptedException e) {
throw new IOException("Read interrupted", e);
}
} catch (IOException e) {
caughtException = e;
}
} catch (IOException e) {
caughtException = e;
}
caughtException = caughtException != null ? caughtException
: this.session == null ? new IOException("WebSocket session closed") : null;
String responseOuter = !responses.isEmpty() ? responses.remove(0) : "";
logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length());
logger.trace("hub '{}' received:{}", hubId, responseOuter);
logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length());
logger.trace("hub '{}' received:{}", hubId, responseOuter);
// don't throw an exception if already closing
if (closing) {
return "{}";
}
// if an IOException was caught above, re-throw it again
if (caughtException != null) {
throw caughtException;
}
// if an IOException was caught above, re-throw it again
caughtException = websocketException != null ? websocketException : caughtException;
if (caughtException != null) {
throw caughtException;
}
try {
Response responseDto = gson.fromJson(responseOuter, Response.class);
if (responseDto == null) {
throw new JsonSyntaxException("Response DTO is invalid");
try {
Response responseDto = gson.fromJson(responseOuter, Response.class);
if (responseDto == null) {
throw new JsonSyntaxException("Response DTO is invalid");
}
if (!NeoHubBindingConstants.HM_SET_COMMAND_RESPONSE.equals(responseDto.message_type)) {
throw new JsonSyntaxException("DTO 'message_type' field is invalid");
}
String responseJson = responseDto.response;
if (responseJson == null) {
throw new JsonSyntaxException("DTO 'response' field is null");
}
responseJson = jsonUnEscape(responseJson).strip();
if (!JsonParser.parseString(responseJson).isJsonObject()) {
throw new JsonSyntaxException("DTO 'response' field is not a JSON object");
}
return responseJson;
} catch (JsonSyntaxException e) {
logger.debug("hub '{}' {}; response:{}", hubId, e.getMessage(), responseOuter);
throw new NeoHubException("Invalid response");
}
if (!NeoHubBindingConstants.HM_SET_COMMAND_RESPONSE.equals(responseDto.message_type)) {
throw new JsonSyntaxException("DTO 'message_type' field is invalid");
}
String responseJson = responseDto.response;
if (responseJson == null) {
throw new JsonSyntaxException("DTO 'response' field is null");
}
responseJson = jsonUnEscape(responseJson).strip();
if (!JsonParser.parseString(responseJson).isJsonObject()) {
throw new JsonSyntaxException("DTO 'response' field is not a JSON object");
}
return responseJson;
} catch (JsonSyntaxException e) {
logger.debug("hub '{}' {}; response:{}", hubId, e.getMessage(), responseOuter);
throw new NeoHubException("Invalid response");
} catch (InterruptedException e) {
throw new NeoHubException("Throttler was interrupted unexpectedly");
}
}
@Override
public void close() {
closeSession();
closing = true;
Session session = this.session;
if (session != null) {
session.close();
this.session = null;
}
try {
trustManagerRegistration.unregister();
} catch (Exception e) {
}
try {
webSocketClient.stop();
} catch (Exception e) {
@ -242,25 +257,29 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
@OnWebSocketConnect
public void onConnect(Session session) {
logger.debug("hub '{}' onConnect() ok", hubId);
this.session = session;
}
@OnWebSocketClose
public void onClose(int statusCode, String reason) {
logger.debug("hub '{}' onClose() statusCode:{}, reason:{}", hubId, statusCode, reason);
responsePending = false;
this.session = null;
String closeMessage = String.format("onClose() statusCode:%d, reason:%s", statusCode, reason);
logger.debug("hub '{}' {}", hubId, closeMessage);
websocketException = new IOException(closeMessage);
}
@OnWebSocketError
public void onError(Throwable cause) {
logger.debug("hub '{}' onError() cause:{}", hubId, cause.getMessage());
closeSession();
public void onError(@Nullable Throwable cause) {
logger.debug("hub '{}' onError() cause:{}", hubId, cause != null ? cause.getMessage() : "null");
websocketException = cause instanceof IOException ioCause ? ioCause : new IOException(cause);
}
@OnWebSocketMessage
public void onMessage(String msg) {
responseOuter = msg.strip();
responsePending = false;
public synchronized void onMessage(String msg) {
int responseCount = responses.size();
if (responseCount > 0) {
String errorMessage = String.format("onMessage() too many responses:%d", responseCount);
logger.debug("hub '{}' {}", hubId, errorMessage);
websocketException = new IOException(errorMessage);
}
responses.add(msg.strip());
}
}