[deconz] beackport of websocket connections improvements (#9307)

* improve websocket connection
* improvement
* backport review comment

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
This commit is contained in:
J-N-K 2020-12-09 23:09:18 +01:00 committed by GitHub
parent ad2a1e3635
commit 0ebd9af3e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 22 deletions

View File

@ -228,10 +228,11 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
/**
* Starts the websocket connection.
*
* {@link #requestFullState} need to be called first to obtain the websocket port.
*/
private void startWebsocket() {
if (websocket.isConnected() || websocketPort == 0) {
if (websocket.isConnected() || websocketPort == 0 || websocketReconnect == false) {
return;
}
@ -284,10 +285,8 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
}
stopTimer();
// Wait for POLL_FREQUENCY_SEC after a connection error before trying again
if (websocketReconnect) {
scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
}
}
@Override
public void connectionEstablished() {
@ -298,10 +297,8 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
@Override
public void connectionLost(String reason) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, reason);
if (websocketReconnect) {
startWebsocket();
}
}
/**
* Return the websocket connection.

View File

@ -17,6 +17,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@ -45,29 +46,33 @@ public class WebSocketConnection {
private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final WebSocketClient client;
private final String socketName;
private final Gson gson;
private final WebSocketConnectionListener connectionListener;
private final Map<String, WebSocketMessageListener> sensorListener = new ConcurrentHashMap<>();
private final Map<String, WebSocketMessageListener> lightListener = new ConcurrentHashMap<>();
private final Gson gson;
private boolean connected = false;
private ConnectionState connectionState = ConnectionState.DISCONNECTED;
public WebSocketConnection(WebSocketConnectionListener listener, WebSocketClient client, Gson gson) {
this.connectionListener = listener;
this.client = client;
this.client.setMaxIdleTimeout(0);
this.gson = gson;
this.socketName = ((QueuedThreadPool) client.getExecutor()).getName() + "$" + this.hashCode();
}
public void start(String ip) {
if (connected) {
if (connectionState == ConnectionState.CONNECTED) {
return;
} else if (connectionState == ConnectionState.CONNECTING) {
logger.debug("{} already connecting", socketName);
return;
}
try {
URI destUri = URI.create("ws://" + ip);
client.start();
logger.debug("Connecting to: {}", destUri);
logger.debug("Trying to connect {} to {}", socketName, destUri);
client.connect(this, destUri).get();
} catch (Exception e) {
connectionListener.connectionError(e);
@ -76,10 +81,10 @@ public class WebSocketConnection {
public void close() {
try {
connected = false;
connectionState = ConnectionState.DISCONNECTING;
client.stop();
} catch (Exception e) {
logger.debug("Error while closing connection", e);
logger.debug("{} encountered an error while closing connection", socketName, e);
}
client.destroy();
}
@ -100,17 +105,18 @@ public class WebSocketConnection {
sensorListener.remove(lightID);
}
@SuppressWarnings("unused")
@OnWebSocketConnect
public void onConnect(Session session) {
connected = true;
logger.debug("Connect: {}", session.getRemoteAddress().getAddress());
connectionState = ConnectionState.CONNECTED;
logger.debug("{} successfully connected to {}", this, session.getRemoteAddress().getAddress());
connectionListener.connectionEstablished();
}
@SuppressWarnings("null")
@SuppressWarnings("null, unused")
@OnWebSocketMessage
public void onMessage(String message) {
logger.trace("Raw data received by websocket: {}", message);
logger.trace("Raw data received by websocket {}: {}", socketName, message);
DeconzBaseMessage changedMessage = gson.fromJson(message, DeconzBaseMessage.class);
switch (changedMessage.r) {
case "sensors":
@ -134,19 +140,36 @@ public class WebSocketConnection {
}
}
@SuppressWarnings("unused")
@OnWebSocketError
public void onError(Throwable cause) {
connected = false;
connectionState = ConnectionState.DISCONNECTED;
connectionListener.connectionError(cause);
}
@SuppressWarnings("unused")
@OnWebSocketClose
public void onClose(int statusCode, String reason) {
connected = false;
connectionState = ConnectionState.DISCONNECTED;
connectionListener.connectionLost(reason);
}
/**
* check connection state (successfully connected)
*
* @return true if connected, false if connecting, disconnecting or disconnected
*/
public boolean isConnected() {
return connected;
return connectionState == ConnectionState.CONNECTED;
}
/**
* used internally to represent the connection state
*/
private enum ConnectionState {
CONNECTING,
CONNECTED,
DISCONNECTING,
DISCONNECTED
}
}