From 84d5d3860613ec767132474ccdffd70e992f9b01 Mon Sep 17 00:00:00 2001 From: J-N-K Date: Thu, 16 Jun 2022 19:41:05 +0200 Subject: [PATCH] Fix dangling SSE subscriptions (#2983) Unlike the sitemap SSE subscriptions generic event subscriptions did not implement a connection lost monitor. Signed-off-by: Jan N. Klug --- .../openhab/core/io/rest/sse/SseResource.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/bundles/org.openhab.core.io.rest.sse/src/main/java/org/openhab/core/io/rest/sse/SseResource.java b/bundles/org.openhab.core.io.rest.sse/src/main/java/org/openhab/core/io/rest/sse/SseResource.java index 313853b81..c1051c4ab 100644 --- a/bundles/org.openhab.core.io.rest.sse/src/main/java/org/openhab/core/io/rest/sse/SseResource.java +++ b/bundles/org.openhab.core.io.rest.sse/src/main/java/org/openhab/core/io/rest/sse/SseResource.java @@ -20,6 +20,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import javax.annotation.security.RolesAllowed; import javax.inject.Singleton; @@ -42,6 +45,7 @@ import javax.ws.rs.sse.SseEventSink; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.openhab.core.auth.Role; +import org.openhab.core.common.ThreadPoolManager; import org.openhab.core.events.Event; import org.openhab.core.io.rest.RESTConstants; import org.openhab.core.io.rest.RESTResource; @@ -99,6 +103,10 @@ public class SseResource implements RESTResource, SsePublisher { private final Logger logger = LoggerFactory.getLogger(SseResource.class); + private final ScheduledExecutorService scheduler = ThreadPoolManager + .getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON); + private final ScheduledFuture cleanSubscriptionsJob; + private @Context @NonNullByDefault({}) Sse sse; private final SseBroadcaster itemStatesBroadcaster = new SseBroadcaster<>(); @@ -111,6 +119,13 @@ public class SseResource implements RESTResource, SsePublisher { public SseResource(@Reference SseItemStatesEventBuilder itemStatesEventBuilder) { this.executorService = Executors.newSingleThreadExecutor(); this.itemStatesEventBuilder = itemStatesEventBuilder; + + cleanSubscriptionsJob = scheduler.scheduleWithFixedDelay(() -> { + logger.debug("Run clean SSE subscriptions job"); + OutboundSseEvent outboundSseEvent = sse.newEventBuilder().name("event") + .mediaType(MediaType.APPLICATION_JSON_TYPE).data(new ServerAliveEvent()).build(); + topicBroadcaster.send(outboundSseEvent); + }, 1, 2, TimeUnit.MINUTES); } @Deactivate @@ -118,6 +133,7 @@ public class SseResource implements RESTResource, SsePublisher { itemStatesBroadcaster.close(); topicBroadcaster.close(); executorService.shutdown(); + cleanSubscriptionsJob.cancel(true); } @Override @@ -245,4 +261,8 @@ public class SseResource implements RESTResource, SsePublisher { } } } + + private static class ServerAliveEvent { + public final String type = "ALIVE"; + } }