Add live packet to SSE item state connections (#3086)

Signed-off-by: Dan Cunningham <dan@digitaldan.com>
This commit is contained in:
Dan Cunningham 2022-09-26 10:09:33 -07:00 committed by GitHub
parent 065e33f5ab
commit b808ea6d13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -101,11 +101,13 @@ public class SseResource implements RESTResource, SsePublisher {
private static final String X_ACCEL_BUFFERING_HEADER = "X-Accel-Buffering"; private static final String X_ACCEL_BUFFERING_HEADER = "X-Accel-Buffering";
public static final int ALIVE_INTERVAL_SECONDS = 10;
private final Logger logger = LoggerFactory.getLogger(SseResource.class); private final Logger logger = LoggerFactory.getLogger(SseResource.class);
private final ScheduledExecutorService scheduler = ThreadPoolManager private final ScheduledExecutorService scheduler = ThreadPoolManager
.getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON); .getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);
private final ScheduledFuture<?> cleanSubscriptionsJob; private final ScheduledFuture<?> aliveEventJob;
private @Context @NonNullByDefault({}) Sse sse; private @Context @NonNullByDefault({}) Sse sse;
@ -120,12 +122,13 @@ public class SseResource implements RESTResource, SsePublisher {
this.executorService = Executors.newSingleThreadExecutor(); this.executorService = Executors.newSingleThreadExecutor();
this.itemStatesEventBuilder = itemStatesEventBuilder; this.itemStatesEventBuilder = itemStatesEventBuilder;
cleanSubscriptionsJob = scheduler.scheduleWithFixedDelay(() -> { aliveEventJob = scheduler.scheduleWithFixedDelay(() -> {
logger.debug("Run clean SSE subscriptions job"); logger.debug("Sending alive event to SSE connections");
OutboundSseEvent outboundSseEvent = sse.newEventBuilder().name("event") OutboundSseEvent aliveEvent = sse.newEventBuilder().name("alive").mediaType(MediaType.APPLICATION_JSON_TYPE)
.mediaType(MediaType.APPLICATION_JSON_TYPE).data(new ServerAliveEvent()).build(); .data(new AliveEvent()).build();
topicBroadcaster.send(outboundSseEvent); itemStatesBroadcaster.send(aliveEvent);
}, 1, 2, TimeUnit.MINUTES); topicBroadcaster.send(aliveEvent);
}, 1, ALIVE_INTERVAL_SECONDS, TimeUnit.SECONDS);
} }
@Deactivate @Deactivate
@ -133,7 +136,7 @@ public class SseResource implements RESTResource, SsePublisher {
itemStatesBroadcaster.close(); itemStatesBroadcaster.close();
topicBroadcaster.close(); topicBroadcaster.close();
executorService.shutdown(); executorService.shutdown();
cleanSubscriptionsJob.cancel(true); aliveEventJob.cancel(true);
} }
@Override @Override
@ -262,7 +265,8 @@ public class SseResource implements RESTResource, SsePublisher {
} }
} }
private static class ServerAliveEvent { private static class AliveEvent {
public final String type = "ALIVE"; public final String type = "ALIVE";
public final int interval = ALIVE_INTERVAL_SECONDS;
} }
} }