mirror of
https://github.com/openhab/openhab-addons.git
synced 2025-01-10 15:11:59 +01:00
[openhabcloud] Reconnection Fixes (#14251)
* [openhabcloud] Possible connection leak * Creates thread safe reconnection, reduces unnecessary polling on setup, removes unused variables. * adds the reconnect settings to the internal socket.io options. * Up the min reconnect time * Use @ssalonen sugestion for backoff mins and randomness * Reconnect after server initiated disconnect * Remove unhelpful comments Signed-off-by: Dan Cunningham <dan@digitaldan.com>
This commit is contained in:
parent
16f3a3e854
commit
9ba3c07d3e
@ -22,10 +22,13 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
@ -51,6 +54,7 @@ import io.socket.client.Manager;
|
||||
import io.socket.client.Socket;
|
||||
import io.socket.emitter.Emitter;
|
||||
import io.socket.engineio.client.Transport;
|
||||
import io.socket.engineio.client.transports.WebSocket;
|
||||
import io.socket.parser.Packet;
|
||||
import io.socket.parser.Parser;
|
||||
import okhttp3.OkHttpClient.Builder;
|
||||
@ -68,6 +72,15 @@ import okhttp3.logging.HttpLoggingInterceptor.Level;
|
||||
* @author Kai Kreuzer - migrated code to new Jetty client and ESH APIs
|
||||
*/
|
||||
public class CloudClient {
|
||||
|
||||
private static final long RECONNECT_MIN = 2_000;
|
||||
|
||||
private static final long RECONNECT_MAX = 60_000;
|
||||
|
||||
private static final double RECONNECT_JITTER = 0.75;
|
||||
|
||||
private static final long READ_TIMEOUT = 60_0000;
|
||||
|
||||
/*
|
||||
* Logger for this class
|
||||
*/
|
||||
@ -108,11 +121,6 @@ public class CloudClient {
|
||||
*/
|
||||
private boolean isConnected;
|
||||
|
||||
/*
|
||||
* This variable holds version of local openHAB
|
||||
*/
|
||||
private String openHABVersion;
|
||||
|
||||
/*
|
||||
* This variable holds instance of Socket.IO client class which provides communication
|
||||
* with the openHAB Cloud
|
||||
@ -139,11 +147,15 @@ public class CloudClient {
|
||||
|
||||
/*
|
||||
* Delay reconnect scheduler pool
|
||||
*
|
||||
*
|
||||
*/
|
||||
protected final ScheduledExecutorService scheduler = ThreadPoolManager
|
||||
.getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);
|
||||
|
||||
@SuppressWarnings("null")
|
||||
private final AtomicReference<Optional<ScheduledFuture<?>>> reconnectFuture = new AtomicReference<>(
|
||||
Optional.empty());
|
||||
|
||||
/**
|
||||
* Constructor of CloudClient
|
||||
*
|
||||
@ -161,9 +173,9 @@ public class CloudClient {
|
||||
this.remoteAccessEnabled = remoteAccessEnabled;
|
||||
this.exposedItems = exposedItems;
|
||||
this.jettyClient = httpClient;
|
||||
reconnectBackoff.setMin(1000);
|
||||
reconnectBackoff.setMax(30_000);
|
||||
reconnectBackoff.setJitter(0.5);
|
||||
reconnectBackoff.setMin(RECONNECT_MIN);
|
||||
reconnectBackoff.setMax(RECONNECT_MAX);
|
||||
reconnectBackoff.setJitter(RECONNECT_JITTER);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -173,17 +185,25 @@ public class CloudClient {
|
||||
public void connect() {
|
||||
try {
|
||||
Options options = new Options();
|
||||
options.transports = new String[] { WebSocket.NAME };
|
||||
options.reconnection = true;
|
||||
options.reconnectionAttempts = Integer.MAX_VALUE;
|
||||
options.reconnectionDelay = RECONNECT_MIN;
|
||||
options.reconnectionDelayMax = RECONNECT_MAX;
|
||||
options.randomizationFactor = RECONNECT_JITTER;
|
||||
options.timeout = READ_TIMEOUT;
|
||||
Builder okHttpBuilder = new Builder();
|
||||
okHttpBuilder.readTimeout(READ_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
if (logger.isTraceEnabled()) {
|
||||
// When trace level logging is enabled, we activate further logging of HTTP calls
|
||||
// of the Socket.IO library
|
||||
Builder okHttpBuilder = new Builder();
|
||||
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
|
||||
loggingInterceptor.setLevel(Level.BASIC);
|
||||
okHttpBuilder.addInterceptor(loggingInterceptor);
|
||||
okHttpBuilder.addNetworkInterceptor(loggingInterceptor);
|
||||
options.callFactory = okHttpBuilder.build();
|
||||
options.webSocketFactory = okHttpBuilder.build();
|
||||
}
|
||||
options.callFactory = okHttpBuilder.build();
|
||||
options.webSocketFactory = okHttpBuilder.build();
|
||||
socket = IO.socket(baseURL, options);
|
||||
URL parsed = new URL(baseURL);
|
||||
protocol = parsed.getProtocol();
|
||||
@ -273,13 +293,17 @@ public class CloudClient {
|
||||
.on(Socket.EVENT_RECONNECT_FAILED,
|
||||
args -> logger.debug("Socket.IO re-connect attempts failed. Stopping reconnection."))//
|
||||
.on(Socket.EVENT_DISCONNECT, args -> {
|
||||
if (args.length > 0) {
|
||||
logger.warn("Socket.IO disconnected: {}", args[0]);
|
||||
} else {
|
||||
logger.warn("Socket.IO disconnected");
|
||||
}
|
||||
String message = args.length > 0 ? args[0].toString() : "";
|
||||
logger.warn("Socket.IO disconnected: {}", message);
|
||||
isConnected = false;
|
||||
onDisconnect();
|
||||
// https://github.com/socketio/socket.io-client/commit/afb952d854e1d8728ce07b7c3a9f0dee2a61ef4e
|
||||
if ("io server disconnect".equals(message)) {
|
||||
socket.close();
|
||||
long delay = reconnectBackoff.duration();
|
||||
logger.warn("Reconnecting after {} ms.", delay);
|
||||
scheduleReconnect(delay);
|
||||
}
|
||||
})//
|
||||
.on(Socket.EVENT_ERROR, args -> {
|
||||
if (CloudClient.this.socket.connected()) {
|
||||
@ -325,12 +349,7 @@ public class CloudClient {
|
||||
logger.warn("Error connecting to the openHAB Cloud instance. Reconnecting.");
|
||||
}
|
||||
socket.close();
|
||||
scheduler.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
socket.connect();
|
||||
}
|
||||
}, delay, TimeUnit.MILLISECONDS);
|
||||
scheduleReconnect(delay);
|
||||
}
|
||||
})//
|
||||
|
||||
@ -671,21 +690,23 @@ public class CloudClient {
|
||||
*/
|
||||
public void shutdown() {
|
||||
logger.info("Shutting down openHAB Cloud service connection");
|
||||
reconnectFuture.get().ifPresent(future -> future.cancel(true));
|
||||
socket.disconnect();
|
||||
}
|
||||
|
||||
public String getOpenHABVersion() {
|
||||
return openHABVersion;
|
||||
}
|
||||
|
||||
public void setOpenHABVersion(String openHABVersion) {
|
||||
this.openHABVersion = openHABVersion;
|
||||
}
|
||||
|
||||
public void setListener(CloudClientListener listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
private void scheduleReconnect(long delay) {
|
||||
reconnectFuture.getAndSet(Optional.of(scheduler.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
socket.connect();
|
||||
}
|
||||
}, delay, TimeUnit.MILLISECONDS))).ifPresent(future -> future.cancel(true));
|
||||
}
|
||||
|
||||
private JSONObject getJSONHeaders(HttpFields httpFields) {
|
||||
JSONObject headersJSON = new JSONObject();
|
||||
try {
|
||||
|
@ -247,7 +247,6 @@ public class CloudService implements ActionService, CloudClientListener, EventSu
|
||||
String localBaseUrl = "http://localhost:" + localPort;
|
||||
cloudClient = new CloudClient(httpClient, InstanceUUID.get(), getSecret(), cloudBaseUrl, localBaseUrl,
|
||||
remoteAccessEnabled, exposedItems);
|
||||
cloudClient.setOpenHABVersion(OpenHAB.getVersion());
|
||||
cloudClient.connect();
|
||||
cloudClient.setListener(this);
|
||||
NotificationAction.cloudService = this;
|
||||
|
Loading…
Reference in New Issue
Block a user