Fix sitemap subscription cleanup (#1679)

* Add listener to SseBroadcaster so sseEventSinkRemoved events are handled
* Use Instant instead of long for tracking subscription creation times
* Run cleanup every 2 minutes instead of every 5 minutes

Fixes #1674

Signed-off-by: Wouter Born <github@maindrain.net>
This commit is contained in:
Wouter Born 2020-10-02 08:11:31 +02:00 committed by GitHub
parent 369e678a63
commit 9739271b52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 8 deletions

View File

@ -12,6 +12,8 @@
*/ */
package org.openhab.core.io.rest.sitemap; package org.openhab.core.io.rest.sitemap;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -88,8 +90,8 @@ public class SitemapSubscriptionService implements ModelRepositoryChangeListener
/* subscription id -> callback */ /* subscription id -> callback */
private final Map<String, SitemapSubscriptionCallback> callbacks = new ConcurrentHashMap<>(); private final Map<String, SitemapSubscriptionCallback> callbacks = new ConcurrentHashMap<>();
/* subscription id -> creation date */ /* subscription id -> creation instant */
private final Map<String, Long> creationDates = new ConcurrentHashMap<>(); private final Map<String, Instant> creationInstants = new ConcurrentHashMap<>();
/* sitemap+page -> listener */ /* sitemap+page -> listener */
private final Map<String, PageChangeListener> pageChangeListeners = new ConcurrentHashMap<>(); private final Map<String, PageChangeListener> pageChangeListeners = new ConcurrentHashMap<>();
@ -107,6 +109,7 @@ public class SitemapSubscriptionService implements ModelRepositoryChangeListener
protected void deactivate() { protected void deactivate() {
pageOfSubscription.clear(); pageOfSubscription.clear();
callbacks.clear(); callbacks.clear();
creationInstants.clear();
for (PageChangeListener listener : pageChangeListeners.values()) { for (PageChangeListener listener : pageChangeListeners.values()) {
listener.dispose(); listener.dispose();
} }
@ -156,7 +159,7 @@ public class SitemapSubscriptionService implements ModelRepositoryChangeListener
} }
String subscriptionId = UUID.randomUUID().toString(); String subscriptionId = UUID.randomUUID().toString();
callbacks.put(subscriptionId, callback); callbacks.put(subscriptionId, callback);
creationDates.put(subscriptionId, System.currentTimeMillis()); creationInstants.put(subscriptionId, Instant.now());
logger.debug("Created new subscription with id {} ({} active subscriptions for a max of {})", subscriptionId, logger.debug("Created new subscription with id {} ({} active subscriptions for a max of {})", subscriptionId,
callbacks.size(), maxSubscriptions); callbacks.size(), maxSubscriptions);
return subscriptionId; return subscriptionId;
@ -168,7 +171,7 @@ public class SitemapSubscriptionService implements ModelRepositoryChangeListener
* @param subscriptionId the id of the subscription to remove * @param subscriptionId the id of the subscription to remove
*/ */
public void removeSubscription(String subscriptionId) { public void removeSubscription(String subscriptionId) {
creationDates.remove(subscriptionId); creationInstants.remove(subscriptionId);
callbacks.remove(subscriptionId); callbacks.remove(subscriptionId);
String sitemapPage = pageOfSubscription.remove(subscriptionId); String sitemapPage = pageOfSubscription.remove(subscriptionId);
if (sitemapPage != null && !pageOfSubscription.values().contains(sitemapPage)) { if (sitemapPage != null && !pageOfSubscription.values().contains(sitemapPage)) {
@ -322,11 +325,11 @@ public class SitemapSubscriptionService implements ModelRepositoryChangeListener
public void checkAliveClients() { public void checkAliveClients() {
// Release the subscriptions that are not attached to a page // Release the subscriptions that are not attached to a page
for (Entry<String, Long> dateEntry : creationDates.entrySet()) { for (Entry<String, Instant> creationEntry : creationInstants.entrySet()) {
String subscriptionId = dateEntry.getKey(); String subscriptionId = creationEntry.getKey();
SitemapSubscriptionCallback callback = callbacks.get(subscriptionId); SitemapSubscriptionCallback callback = callbacks.get(subscriptionId);
if (getPageId(subscriptionId) == null && callback != null if (getPageId(subscriptionId) == null && callback != null
&& (dateEntry.getValue().longValue() + 30000) < System.currentTimeMillis()) { && (creationEntry.getValue().plus(Duration.ofSeconds(30)).isBefore(Instant.now()))) {
logger.debug("Release subscription {} as sitemap page is not set", subscriptionId); logger.debug("Release subscription {} as sitemap page is not set", subscriptionId);
removeSubscription(subscriptionId); removeSubscription(subscriptionId);
callback.onRelease(subscriptionId); callback.onRelease(subscriptionId);

View File

@ -185,6 +185,7 @@ public class SitemapResource
this.subscriptions = subscriptions; this.subscriptions = subscriptions;
broadcaster = new SseBroadcaster<>(); broadcaster = new SseBroadcaster<>();
broadcaster.addListener(this);
// The clean SSE subscriptions job sends an ALIVE event to all subscribers. This will trigger // The clean SSE subscriptions job sends an ALIVE event to all subscribers. This will trigger
// an exception when the subscriber is dead, leading to the release of the SSE subscription // an exception when the subscriber is dead, leading to the release of the SSE subscription
@ -195,7 +196,7 @@ public class SitemapResource
cleanSubscriptionsJob = scheduler.scheduleAtFixedRate(() -> { cleanSubscriptionsJob = scheduler.scheduleAtFixedRate(() -> {
logger.debug("Run clean SSE subscriptions job"); logger.debug("Run clean SSE subscriptions job");
subscriptions.checkAliveClients(); subscriptions.checkAliveClients();
}, 1, 5, TimeUnit.MINUTES); }, 1, 2, TimeUnit.MINUTES);
} }
@Deactivate @Deactivate
@ -206,6 +207,7 @@ public class SitemapResource
job.cancel(true); job.cancel(true);
cleanSubscriptionsJob = null; cleanSubscriptionsJob = null;
} }
broadcaster.removeListener(this);
broadcaster.close(); broadcaster.close();
} }