From ff0e2293aaa84cc4cdd8202fdf779355069a4a4d Mon Sep 17 00:00:00 2001 From: Andrew Fiddian-Green Date: Fri, 15 Mar 2024 12:45:57 +0000 Subject: [PATCH] [neohub] Improved Web-Socket Communications (#16312) * [neohub] Improved WebSocket Communications * [neohub] session recycled only by handler; not by socket class --------- Signed-off-by: Andrew Fiddian-Green Signed-off-by: Ciprian Pascu --- .../binding/neohub/internal/NeoHubSocket.java | 5 +- .../neohub/internal/NeoHubSocketBase.java | 32 ++- .../NeoHubTlsTrustManagerProvider.java | 45 ++++ .../neohub/internal/NeoHubWebSocket.java | 201 ++++++++++-------- 4 files changed, 180 insertions(+), 103 deletions(-) create mode 100644 bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubTlsTrustManagerProvider.java diff --git a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocket.java b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocket.java index a501d687b5c..d0140337885 100644 --- a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocket.java +++ b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocket.java @@ -47,8 +47,7 @@ public class NeoHubSocket extends NeoHubSocketBase { IOException caughtException = null; StringBuilder builder = new StringBuilder(); - throttle(); - try (Socket socket = new Socket()) { + try (Socket socket = new Socket(); Throttler throttler = new Throttler();) { int port = config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_TCP; socket.connect(new InetSocketAddress(config.hostName, port), config.socketTimeout * 1000); socket.setSoTimeout(config.socketTimeout * 1000); @@ -76,6 +75,8 @@ public class NeoHubSocket extends NeoHubSocketBase { } catch (IOException e) { // catch IOExceptions here, and save them to be re-thrown later caughtException = e; + } catch (InterruptedException e) { + caughtException = new IOException(e); } String responseJson = builder.toString().strip(); diff --git a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocketBase.java b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocketBase.java index 6fd883e428a..69bea74bb56 100644 --- a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocketBase.java +++ b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubSocketBase.java @@ -17,6 +17,8 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -33,6 +35,7 @@ public abstract class NeoHubSocketBase implements Closeable { protected final String hubId; private static final int REQUEST_INTERVAL_MILLISECS = 1000; + private final Lock lock = new ReentrantLock(true); private Optional lastRequestTime = Optional.empty(); public NeoHubSocketBase(NeoHubConfiguration config, String hubId) { @@ -51,22 +54,31 @@ public abstract class NeoHubSocketBase implements Closeable { public abstract String sendMessage(final String requestJson) throws IOException, NeoHubException; /** - * Method for throttling requests to prevent overloading the hub. + * Class for throttling requests to prevent overloading the hub. *

* The NeoHub can get confused if, while it is uploading data to the cloud, it also receives too many local * requests, so this method throttles the requests to one per REQUEST_INTERVAL_MILLISECS maximum. * - * @throws NeoHubException if the wait is interrupted + * @throws InterruptedException if the wait is interrupted */ - protected synchronized void throttle() throws NeoHubException { - try { - Instant now = Instant.now(); - long delay = lastRequestTime - .map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS)).orElse(0L); - lastRequestTime = Optional.of(now.plusMillis(delay)); + protected class Throttler implements AutoCloseable { + + public Throttler() throws InterruptedException { + lock.lock(); + long delay; + synchronized (NeoHubSocketBase.this) { + Instant now = Instant.now(); + delay = lastRequestTime + .map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS)) + .orElse(0L); + lastRequestTime = Optional.of(now.plusMillis(delay)); + } Thread.sleep(delay); - } catch (InterruptedException e) { - throw new NeoHubException("Throttle sleep interrupted", e); + } + + @Override + public void close() { + lock.unlock(); } } } diff --git a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubTlsTrustManagerProvider.java b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubTlsTrustManagerProvider.java new file mode 100644 index 00000000000..4dae8396a01 --- /dev/null +++ b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubTlsTrustManagerProvider.java @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2010-2024 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.binding.neohub.internal; + +import javax.net.ssl.X509ExtendedTrustManager; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.openhab.core.io.net.http.TlsTrustManagerProvider; +import org.openhab.core.io.net.http.TrustAllTrustManager; + +/** + * A {@link TlsTrustManagerProvider} implementation to validate the NeoHub web socket self signed certificate. + * + * @author Andrew Fiddian-Green - Initial contribution + */ +@NonNullByDefault +public class NeoHubTlsTrustManagerProvider implements TlsTrustManagerProvider { + + private final String fullHostName; + + public NeoHubTlsTrustManagerProvider(NeoHubConfiguration config) { + fullHostName = String.format("%s:%d", config.hostName, + config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_WSS); + } + + @Override + public String getHostName() { + return fullHostName; + } + + @Override + public X509ExtendedTrustManager getTrustManager() { + return TrustAllTrustManager.getInstance(); + } +} diff --git a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubWebSocket.java b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubWebSocket.java index 721e981e5f3..dabe0b07427 100644 --- a/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubWebSocket.java +++ b/bundles/org.openhab.binding.neohub/src/main/java/org/openhab/binding/neohub/internal/NeoHubWebSocket.java @@ -16,11 +16,12 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.time.Instant; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; -import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; @@ -28,9 +29,12 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.openhab.core.io.net.http.TlsTrustManagerProvider; import org.openhab.core.io.net.http.WebSocketFactory; import org.openhab.core.thing.ThingUID; import org.openhab.core.thing.util.ThingWebClientUtil; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,10 +59,12 @@ public class NeoHubWebSocket extends NeoHubSocketBase { private final Logger logger = LoggerFactory.getLogger(NeoHubWebSocket.class); private final Gson gson = new Gson(); private final WebSocketClient webSocketClient; + private final ServiceRegistration trustManagerRegistration; private @Nullable Session session = null; - private String responseOuter = ""; - private boolean responsePending; + private @Nullable IOException websocketException = null; + private List responses = new CopyOnWriteArrayList<>(); + private boolean closing; /** * DTO to receive and parse the response JSON. @@ -78,16 +84,21 @@ public class NeoHubWebSocket extends NeoHubSocketBase { throws IOException { super(config, bridgeUID.getAsString()); - SslContextFactory.Client sslContextFactory = new SslContextFactory.Client(); - sslContextFactory.setTrustAll(true); String name = ThingWebClientUtil.buildWebClientConsumerName(bridgeUID, null); - webSocketClient = webSocketFactory.createWebSocketClient(name, sslContextFactory); + webSocketClient = webSocketFactory.createWebSocketClient(name); webSocketClient.setConnectTimeout(config.socketTimeout * 1000); try { webSocketClient.start(); } catch (Exception e) { throw new IOException("Error starting Web Socket client", e); } + NeoHubTlsTrustManagerProvider trustManagerProvider = new NeoHubTlsTrustManagerProvider(config); + try { + trustManagerRegistration = FrameworkUtil.getBundle(getClass()).getBundleContext() + .registerService(TlsTrustManagerProvider.class.getName(), trustManagerProvider, null); + } catch (IllegalStateException e) { + throw new IOException("Error registering trust manager", e); + } } /** @@ -95,14 +106,13 @@ public class NeoHubWebSocket extends NeoHubSocketBase { * * @throws IOException if unable to open the web socket */ - private void startSession() throws IOException { + private synchronized void startSession() throws IOException { Session session = this.session; if (session == null || !session.isOpen()) { - closeSession(); try { int port = config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_WSS; URI uri = new URI(String.format("wss://%s:%d", config.hostName, port)); - webSocketClient.connect(this, uri).get(); + this.session = webSocketClient.connect(this, uri).get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Error starting session", e); @@ -112,17 +122,6 @@ public class NeoHubWebSocket extends NeoHubSocketBase { } } - /** - * Close the web socket session. - */ - private void closeSession() { - Session session = this.session; - this.session = null; - if (session != null) { - session.close(); - } - } - /** * Helper to escape the quote marks in a JSON string. * @@ -154,85 +153,101 @@ public class NeoHubWebSocket extends NeoHubSocketBase { } @Override - public synchronized String sendMessage(final String requestJson) throws IOException, NeoHubException { - // start the session - startSession(); - - // session start failed - Session session = this.session; - if (session == null) { - throw new IOException("Session is null"); + public String sendMessage(final String requestJson) throws IOException, NeoHubException { + if (!closing && websocketException != null) { + throw websocketException; } - // wrap the inner request in an outer request string - String requestOuter = String.format(REQUEST_OUTER, - jsonEscape(String.format(REQUEST_INNER, config.apiToken, jsonReplaceQuotes(requestJson)))); + try (Throttler throttler = new Throttler()) { + // start the session + startSession(); - // initialise the response - responseOuter = ""; - responsePending = true; + // session start failed + Session session = this.session; + if (session == null) { + throw new IOException("Session is null"); + } - IOException caughtException = null; - throttle(); - try { - // send the request - logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length()); - session.getRemote().sendString(requestOuter); - logger.trace("hub '{}' sent:{}", hubId, requestOuter); + // wrap the inner request in an outer request string + String requestOuter = String.format(REQUEST_OUTER, + jsonEscape(String.format(REQUEST_INNER, config.apiToken, jsonReplaceQuotes(requestJson)))); - // sleep and loop until we get a response, the socket is closed, or it times out - Instant timeout = Instant.now().plusSeconds(config.socketTimeout); - while (responsePending) { - try { - Thread.sleep(SLEEP_MILLISECONDS); - if (Instant.now().isAfter(timeout)) { - throw new IOException("Read timed out"); + IOException caughtException = null; + try { + // send the request + logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length()); + session.getRemote().sendString(requestOuter); + logger.trace("hub '{}' sent:{}", hubId, requestOuter); + + // sleep and loop until we get a response, the socket is closed, or it times out + Instant timeout = Instant.now().plusSeconds(config.socketTimeout); + while (!closing && websocketException == null && responses.isEmpty()) { + try { + Thread.sleep(SLEEP_MILLISECONDS); + if (Instant.now().isAfter(timeout)) { + throw new IOException("Read timed out"); + } + } catch (InterruptedException e) { + throw new IOException("Read interrupted", e); } - } catch (InterruptedException e) { - throw new IOException("Read interrupted", e); } + } catch (IOException e) { + caughtException = e; } - } catch (IOException e) { - caughtException = e; - } - caughtException = caughtException != null ? caughtException - : this.session == null ? new IOException("WebSocket session closed") : null; + String responseOuter = !responses.isEmpty() ? responses.remove(0) : ""; + logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length()); + logger.trace("hub '{}' received:{}", hubId, responseOuter); - logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length()); - logger.trace("hub '{}' received:{}", hubId, responseOuter); + // don't throw an exception if already closing + if (closing) { + return "{}"; + } - // if an IOException was caught above, re-throw it again - if (caughtException != null) { - throw caughtException; - } + // if an IOException was caught above, re-throw it again + caughtException = websocketException != null ? websocketException : caughtException; + if (caughtException != null) { + throw caughtException; + } - try { - Response responseDto = gson.fromJson(responseOuter, Response.class); - if (responseDto == null) { - throw new JsonSyntaxException("Response DTO is invalid"); + try { + Response responseDto = gson.fromJson(responseOuter, Response.class); + if (responseDto == null) { + throw new JsonSyntaxException("Response DTO is invalid"); + } + if (!NeoHubBindingConstants.HM_SET_COMMAND_RESPONSE.equals(responseDto.message_type)) { + throw new JsonSyntaxException("DTO 'message_type' field is invalid"); + } + String responseJson = responseDto.response; + if (responseJson == null) { + throw new JsonSyntaxException("DTO 'response' field is null"); + } + responseJson = jsonUnEscape(responseJson).strip(); + if (!JsonParser.parseString(responseJson).isJsonObject()) { + throw new JsonSyntaxException("DTO 'response' field is not a JSON object"); + } + return responseJson; + } catch (JsonSyntaxException e) { + logger.debug("hub '{}' {}; response:{}", hubId, e.getMessage(), responseOuter); + throw new NeoHubException("Invalid response"); } - if (!NeoHubBindingConstants.HM_SET_COMMAND_RESPONSE.equals(responseDto.message_type)) { - throw new JsonSyntaxException("DTO 'message_type' field is invalid"); - } - String responseJson = responseDto.response; - if (responseJson == null) { - throw new JsonSyntaxException("DTO 'response' field is null"); - } - responseJson = jsonUnEscape(responseJson).strip(); - if (!JsonParser.parseString(responseJson).isJsonObject()) { - throw new JsonSyntaxException("DTO 'response' field is not a JSON object"); - } - return responseJson; - } catch (JsonSyntaxException e) { - logger.debug("hub '{}' {}; response:{}", hubId, e.getMessage(), responseOuter); - throw new NeoHubException("Invalid response"); + } catch (InterruptedException e) { + throw new NeoHubException("Throttler was interrupted unexpectedly"); } } @Override public void close() { - closeSession(); + closing = true; + Session session = this.session; + if (session != null) { + session.close(); + this.session = null; + } + try { + trustManagerRegistration.unregister(); + } catch (Exception e) { + } try { webSocketClient.stop(); } catch (Exception e) { @@ -242,25 +257,29 @@ public class NeoHubWebSocket extends NeoHubSocketBase { @OnWebSocketConnect public void onConnect(Session session) { logger.debug("hub '{}' onConnect() ok", hubId); - this.session = session; } @OnWebSocketClose public void onClose(int statusCode, String reason) { - logger.debug("hub '{}' onClose() statusCode:{}, reason:{}", hubId, statusCode, reason); - responsePending = false; - this.session = null; + String closeMessage = String.format("onClose() statusCode:%d, reason:%s", statusCode, reason); + logger.debug("hub '{}' {}", hubId, closeMessage); + websocketException = new IOException(closeMessage); } @OnWebSocketError - public void onError(Throwable cause) { - logger.debug("hub '{}' onError() cause:{}", hubId, cause.getMessage()); - closeSession(); + public void onError(@Nullable Throwable cause) { + logger.debug("hub '{}' onError() cause:{}", hubId, cause != null ? cause.getMessage() : "null"); + websocketException = cause instanceof IOException ioCause ? ioCause : new IOException(cause); } @OnWebSocketMessage - public void onMessage(String msg) { - responseOuter = msg.strip(); - responsePending = false; + public synchronized void onMessage(String msg) { + int responseCount = responses.size(); + if (responseCount > 0) { + String errorMessage = String.format("onMessage() too many responses:%d", responseCount); + logger.debug("hub '{}' {}", hubId, errorMessage); + websocketException = new IOException(errorMessage); + } + responses.add(msg.strip()); } }