mirror of
https://github.com/openhab/openhab-addons.git
synced 2025-01-25 14:55:55 +01:00
[neohub] Improved Web-Socket Communications (#16312)
* [neohub] Improved WebSocket Communications * [neohub] session recycled only by handler; not by socket class --------- Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
This commit is contained in:
parent
271520e73f
commit
a2db13130b
@ -47,8 +47,7 @@ public class NeoHubSocket extends NeoHubSocketBase {
|
||||
IOException caughtException = null;
|
||||
StringBuilder builder = new StringBuilder();
|
||||
|
||||
throttle();
|
||||
try (Socket socket = new Socket()) {
|
||||
try (Socket socket = new Socket(); Throttler throttler = new Throttler();) {
|
||||
int port = config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_TCP;
|
||||
socket.connect(new InetSocketAddress(config.hostName, port), config.socketTimeout * 1000);
|
||||
socket.setSoTimeout(config.socketTimeout * 1000);
|
||||
@ -76,6 +75,8 @@ public class NeoHubSocket extends NeoHubSocketBase {
|
||||
} catch (IOException e) {
|
||||
// catch IOExceptions here, and save them to be re-thrown later
|
||||
caughtException = e;
|
||||
} catch (InterruptedException e) {
|
||||
caughtException = new IOException(e);
|
||||
}
|
||||
|
||||
String responseJson = builder.toString().strip();
|
||||
|
@ -17,6 +17,8 @@ import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
|
||||
@ -33,6 +35,7 @@ public abstract class NeoHubSocketBase implements Closeable {
|
||||
protected final String hubId;
|
||||
|
||||
private static final int REQUEST_INTERVAL_MILLISECS = 1000;
|
||||
private final Lock lock = new ReentrantLock(true);
|
||||
private Optional<Instant> lastRequestTime = Optional.empty();
|
||||
|
||||
public NeoHubSocketBase(NeoHubConfiguration config, String hubId) {
|
||||
@ -51,22 +54,31 @@ public abstract class NeoHubSocketBase implements Closeable {
|
||||
public abstract String sendMessage(final String requestJson) throws IOException, NeoHubException;
|
||||
|
||||
/**
|
||||
* Method for throttling requests to prevent overloading the hub.
|
||||
* Class for throttling requests to prevent overloading the hub.
|
||||
* <p>
|
||||
* The NeoHub can get confused if, while it is uploading data to the cloud, it also receives too many local
|
||||
* requests, so this method throttles the requests to one per REQUEST_INTERVAL_MILLISECS maximum.
|
||||
*
|
||||
* @throws NeoHubException if the wait is interrupted
|
||||
* @throws InterruptedException if the wait is interrupted
|
||||
*/
|
||||
protected synchronized void throttle() throws NeoHubException {
|
||||
try {
|
||||
Instant now = Instant.now();
|
||||
long delay = lastRequestTime
|
||||
.map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS)).orElse(0L);
|
||||
lastRequestTime = Optional.of(now.plusMillis(delay));
|
||||
protected class Throttler implements AutoCloseable {
|
||||
|
||||
public Throttler() throws InterruptedException {
|
||||
lock.lock();
|
||||
long delay;
|
||||
synchronized (NeoHubSocketBase.this) {
|
||||
Instant now = Instant.now();
|
||||
delay = lastRequestTime
|
||||
.map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS))
|
||||
.orElse(0L);
|
||||
lastRequestTime = Optional.of(now.plusMillis(delay));
|
||||
}
|
||||
Thread.sleep(delay);
|
||||
} catch (InterruptedException e) {
|
||||
throw new NeoHubException("Throttle sleep interrupted", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,45 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2024 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.binding.neohub.internal;
|
||||
|
||||
import javax.net.ssl.X509ExtendedTrustManager;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.openhab.core.io.net.http.TlsTrustManagerProvider;
|
||||
import org.openhab.core.io.net.http.TrustAllTrustManager;
|
||||
|
||||
/**
|
||||
* A {@link TlsTrustManagerProvider} implementation to validate the NeoHub web socket self signed certificate.
|
||||
*
|
||||
* @author Andrew Fiddian-Green - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class NeoHubTlsTrustManagerProvider implements TlsTrustManagerProvider {
|
||||
|
||||
private final String fullHostName;
|
||||
|
||||
public NeoHubTlsTrustManagerProvider(NeoHubConfiguration config) {
|
||||
fullHostName = String.format("%s:%d", config.hostName,
|
||||
config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_WSS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHostName() {
|
||||
return fullHostName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public X509ExtendedTrustManager getTrustManager() {
|
||||
return TrustAllTrustManager.getInstance();
|
||||
}
|
||||
}
|
@ -16,11 +16,12 @@ import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
|
||||
@ -28,9 +29,12 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
|
||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.openhab.core.io.net.http.TlsTrustManagerProvider;
|
||||
import org.openhab.core.io.net.http.WebSocketFactory;
|
||||
import org.openhab.core.thing.ThingUID;
|
||||
import org.openhab.core.thing.util.ThingWebClientUtil;
|
||||
import org.osgi.framework.FrameworkUtil;
|
||||
import org.osgi.framework.ServiceRegistration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -55,10 +59,12 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
|
||||
private final Logger logger = LoggerFactory.getLogger(NeoHubWebSocket.class);
|
||||
private final Gson gson = new Gson();
|
||||
private final WebSocketClient webSocketClient;
|
||||
private final ServiceRegistration<?> trustManagerRegistration;
|
||||
|
||||
private @Nullable Session session = null;
|
||||
private String responseOuter = "";
|
||||
private boolean responsePending;
|
||||
private @Nullable IOException websocketException = null;
|
||||
private List<String> responses = new CopyOnWriteArrayList<>();
|
||||
private boolean closing;
|
||||
|
||||
/**
|
||||
* DTO to receive and parse the response JSON.
|
||||
@ -78,16 +84,21 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
|
||||
throws IOException {
|
||||
super(config, bridgeUID.getAsString());
|
||||
|
||||
SslContextFactory.Client sslContextFactory = new SslContextFactory.Client();
|
||||
sslContextFactory.setTrustAll(true);
|
||||
String name = ThingWebClientUtil.buildWebClientConsumerName(bridgeUID, null);
|
||||
webSocketClient = webSocketFactory.createWebSocketClient(name, sslContextFactory);
|
||||
webSocketClient = webSocketFactory.createWebSocketClient(name);
|
||||
webSocketClient.setConnectTimeout(config.socketTimeout * 1000);
|
||||
try {
|
||||
webSocketClient.start();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Error starting Web Socket client", e);
|
||||
}
|
||||
NeoHubTlsTrustManagerProvider trustManagerProvider = new NeoHubTlsTrustManagerProvider(config);
|
||||
try {
|
||||
trustManagerRegistration = FrameworkUtil.getBundle(getClass()).getBundleContext()
|
||||
.registerService(TlsTrustManagerProvider.class.getName(), trustManagerProvider, null);
|
||||
} catch (IllegalStateException e) {
|
||||
throw new IOException("Error registering trust manager", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -95,14 +106,13 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
|
||||
*
|
||||
* @throws IOException if unable to open the web socket
|
||||
*/
|
||||
private void startSession() throws IOException {
|
||||
private synchronized void startSession() throws IOException {
|
||||
Session session = this.session;
|
||||
if (session == null || !session.isOpen()) {
|
||||
closeSession();
|
||||
try {
|
||||
int port = config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_WSS;
|
||||
URI uri = new URI(String.format("wss://%s:%d", config.hostName, port));
|
||||
webSocketClient.connect(this, uri).get();
|
||||
this.session = webSocketClient.connect(this, uri).get();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException("Error starting session", e);
|
||||
@ -112,17 +122,6 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the web socket session.
|
||||
*/
|
||||
private void closeSession() {
|
||||
Session session = this.session;
|
||||
this.session = null;
|
||||
if (session != null) {
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to escape the quote marks in a JSON string.
|
||||
*
|
||||
@ -154,85 +153,101 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String sendMessage(final String requestJson) throws IOException, NeoHubException {
|
||||
// start the session
|
||||
startSession();
|
||||
|
||||
// session start failed
|
||||
Session session = this.session;
|
||||
if (session == null) {
|
||||
throw new IOException("Session is null");
|
||||
public String sendMessage(final String requestJson) throws IOException, NeoHubException {
|
||||
if (!closing && websocketException != null) {
|
||||
throw websocketException;
|
||||
}
|
||||
|
||||
// wrap the inner request in an outer request string
|
||||
String requestOuter = String.format(REQUEST_OUTER,
|
||||
jsonEscape(String.format(REQUEST_INNER, config.apiToken, jsonReplaceQuotes(requestJson))));
|
||||
try (Throttler throttler = new Throttler()) {
|
||||
// start the session
|
||||
startSession();
|
||||
|
||||
// initialise the response
|
||||
responseOuter = "";
|
||||
responsePending = true;
|
||||
// session start failed
|
||||
Session session = this.session;
|
||||
if (session == null) {
|
||||
throw new IOException("Session is null");
|
||||
}
|
||||
|
||||
IOException caughtException = null;
|
||||
throttle();
|
||||
try {
|
||||
// send the request
|
||||
logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length());
|
||||
session.getRemote().sendString(requestOuter);
|
||||
logger.trace("hub '{}' sent:{}", hubId, requestOuter);
|
||||
// wrap the inner request in an outer request string
|
||||
String requestOuter = String.format(REQUEST_OUTER,
|
||||
jsonEscape(String.format(REQUEST_INNER, config.apiToken, jsonReplaceQuotes(requestJson))));
|
||||
|
||||
// sleep and loop until we get a response, the socket is closed, or it times out
|
||||
Instant timeout = Instant.now().plusSeconds(config.socketTimeout);
|
||||
while (responsePending) {
|
||||
try {
|
||||
Thread.sleep(SLEEP_MILLISECONDS);
|
||||
if (Instant.now().isAfter(timeout)) {
|
||||
throw new IOException("Read timed out");
|
||||
IOException caughtException = null;
|
||||
try {
|
||||
// send the request
|
||||
logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length());
|
||||
session.getRemote().sendString(requestOuter);
|
||||
logger.trace("hub '{}' sent:{}", hubId, requestOuter);
|
||||
|
||||
// sleep and loop until we get a response, the socket is closed, or it times out
|
||||
Instant timeout = Instant.now().plusSeconds(config.socketTimeout);
|
||||
while (!closing && websocketException == null && responses.isEmpty()) {
|
||||
try {
|
||||
Thread.sleep(SLEEP_MILLISECONDS);
|
||||
if (Instant.now().isAfter(timeout)) {
|
||||
throw new IOException("Read timed out");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException("Read interrupted", e);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException("Read interrupted", e);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
caughtException = e;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
caughtException = e;
|
||||
}
|
||||
|
||||
caughtException = caughtException != null ? caughtException
|
||||
: this.session == null ? new IOException("WebSocket session closed") : null;
|
||||
String responseOuter = !responses.isEmpty() ? responses.remove(0) : "";
|
||||
logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length());
|
||||
logger.trace("hub '{}' received:{}", hubId, responseOuter);
|
||||
|
||||
logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length());
|
||||
logger.trace("hub '{}' received:{}", hubId, responseOuter);
|
||||
// don't throw an exception if already closing
|
||||
if (closing) {
|
||||
return "{}";
|
||||
}
|
||||
|
||||
// if an IOException was caught above, re-throw it again
|
||||
if (caughtException != null) {
|
||||
throw caughtException;
|
||||
}
|
||||
// if an IOException was caught above, re-throw it again
|
||||
caughtException = websocketException != null ? websocketException : caughtException;
|
||||
if (caughtException != null) {
|
||||
throw caughtException;
|
||||
}
|
||||
|
||||
try {
|
||||
Response responseDto = gson.fromJson(responseOuter, Response.class);
|
||||
if (responseDto == null) {
|
||||
throw new JsonSyntaxException("Response DTO is invalid");
|
||||
try {
|
||||
Response responseDto = gson.fromJson(responseOuter, Response.class);
|
||||
if (responseDto == null) {
|
||||
throw new JsonSyntaxException("Response DTO is invalid");
|
||||
}
|
||||
if (!NeoHubBindingConstants.HM_SET_COMMAND_RESPONSE.equals(responseDto.message_type)) {
|
||||
throw new JsonSyntaxException("DTO 'message_type' field is invalid");
|
||||
}
|
||||
String responseJson = responseDto.response;
|
||||
if (responseJson == null) {
|
||||
throw new JsonSyntaxException("DTO 'response' field is null");
|
||||
}
|
||||
responseJson = jsonUnEscape(responseJson).strip();
|
||||
if (!JsonParser.parseString(responseJson).isJsonObject()) {
|
||||
throw new JsonSyntaxException("DTO 'response' field is not a JSON object");
|
||||
}
|
||||
return responseJson;
|
||||
} catch (JsonSyntaxException e) {
|
||||
logger.debug("hub '{}' {}; response:{}", hubId, e.getMessage(), responseOuter);
|
||||
throw new NeoHubException("Invalid response");
|
||||
}
|
||||
if (!NeoHubBindingConstants.HM_SET_COMMAND_RESPONSE.equals(responseDto.message_type)) {
|
||||
throw new JsonSyntaxException("DTO 'message_type' field is invalid");
|
||||
}
|
||||
String responseJson = responseDto.response;
|
||||
if (responseJson == null) {
|
||||
throw new JsonSyntaxException("DTO 'response' field is null");
|
||||
}
|
||||
responseJson = jsonUnEscape(responseJson).strip();
|
||||
if (!JsonParser.parseString(responseJson).isJsonObject()) {
|
||||
throw new JsonSyntaxException("DTO 'response' field is not a JSON object");
|
||||
}
|
||||
return responseJson;
|
||||
} catch (JsonSyntaxException e) {
|
||||
logger.debug("hub '{}' {}; response:{}", hubId, e.getMessage(), responseOuter);
|
||||
throw new NeoHubException("Invalid response");
|
||||
} catch (InterruptedException e) {
|
||||
throw new NeoHubException("Throttler was interrupted unexpectedly");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
closeSession();
|
||||
closing = true;
|
||||
Session session = this.session;
|
||||
if (session != null) {
|
||||
session.close();
|
||||
this.session = null;
|
||||
}
|
||||
try {
|
||||
trustManagerRegistration.unregister();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
try {
|
||||
webSocketClient.stop();
|
||||
} catch (Exception e) {
|
||||
@ -242,25 +257,29 @@ public class NeoHubWebSocket extends NeoHubSocketBase {
|
||||
@OnWebSocketConnect
|
||||
public void onConnect(Session session) {
|
||||
logger.debug("hub '{}' onConnect() ok", hubId);
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
@OnWebSocketClose
|
||||
public void onClose(int statusCode, String reason) {
|
||||
logger.debug("hub '{}' onClose() statusCode:{}, reason:{}", hubId, statusCode, reason);
|
||||
responsePending = false;
|
||||
this.session = null;
|
||||
String closeMessage = String.format("onClose() statusCode:%d, reason:%s", statusCode, reason);
|
||||
logger.debug("hub '{}' {}", hubId, closeMessage);
|
||||
websocketException = new IOException(closeMessage);
|
||||
}
|
||||
|
||||
@OnWebSocketError
|
||||
public void onError(Throwable cause) {
|
||||
logger.debug("hub '{}' onError() cause:{}", hubId, cause.getMessage());
|
||||
closeSession();
|
||||
public void onError(@Nullable Throwable cause) {
|
||||
logger.debug("hub '{}' onError() cause:{}", hubId, cause != null ? cause.getMessage() : "null");
|
||||
websocketException = cause instanceof IOException ioCause ? ioCause : new IOException(cause);
|
||||
}
|
||||
|
||||
@OnWebSocketMessage
|
||||
public void onMessage(String msg) {
|
||||
responseOuter = msg.strip();
|
||||
responsePending = false;
|
||||
public synchronized void onMessage(String msg) {
|
||||
int responseCount = responses.size();
|
||||
if (responseCount > 0) {
|
||||
String errorMessage = String.format("onMessage() too many responses:%d", responseCount);
|
||||
logger.debug("hub '{}' {}", hubId, errorMessage);
|
||||
websocketException = new IOException(errorMessage);
|
||||
}
|
||||
responses.add(msg.strip());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user