Conditionally use a thread pool backed sequential executor for DSL rules and events (#3890)

Signed-off-by: Jörg Sautter <joerg.sautter@gmx.net>
This commit is contained in:
joerg1985 2024-04-29 09:11:56 +02:00 committed by GitHub
parent 81f2bd9366
commit c3ada84b77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 660 additions and 6 deletions

View File

@ -13,7 +13,6 @@
package org.openhab.core.automation.internal;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@ -23,7 +22,7 @@ import org.openhab.core.automation.RuleStatus;
import org.openhab.core.automation.RuleStatusInfo;
import org.openhab.core.automation.Trigger;
import org.openhab.core.automation.handler.TriggerHandlerCallback;
import org.openhab.core.common.NamedThreadFactory;
import org.openhab.core.common.ThreadPoolManager;
/**
* This class is implementation of {@link TriggerHandlerCallback} used by the {@link Trigger}s to notify rule engine
@ -48,7 +47,7 @@ public class TriggerHandlerCallbackImpl implements TriggerHandlerCallback {
protected TriggerHandlerCallbackImpl(RuleEngineImpl re, String ruleUID) {
this.re = re;
this.ruleUID = ruleUID;
executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("rule-" + ruleUID));
this.executor = ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("rules", "rule-" + ruleUID);
}
@Override

View File

@ -0,0 +1,487 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.core.common;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* A ScheduledExecutorService that will sequentially perform the tasks like a
* {@link Executors#newSingleThreadScheduledExecutor} backed by a thread pool.
* This is a drop in replacement to a ScheduledExecutorService with one thread to avoid a lot of threads created, idling
* most of the time and wasting memory on low-end devices.
*
* The mechanism to block the ScheduledExecutorService to run tasks concurrently is based on a chain of
* {@link CompletableFuture}s.
* Each instance has a reference to the last CompletableFuture and will call handleAsync to add a new task.
*
* @author Jörg Sautter - Initial contribution
*/
@NonNullByDefault
class PoolBasedSequentialScheduledExecutorService implements ScheduledExecutorService {
private final WorkQueueEntry empty;
private final ScheduledThreadPoolExecutor pool;
private final List<RunnableFuture<?>> scheduled;
private final ScheduledFuture<?> cleaner;
private @Nullable WorkQueueEntry tail;
public PoolBasedSequentialScheduledExecutorService(ScheduledThreadPoolExecutor pool) {
if (pool.getMaximumPoolSize() != Integer.MAX_VALUE) {
throw new IllegalArgumentException("the pool must scale unlimited to avoid potential dead locks!");
}
this.pool = pool;
// prepare the WorkQueueEntry we are using when no tasks are pending
RunnableCompletableFuture<?> future = new RunnableCompletableFuture<>();
future.complete(null);
empty = new WorkQueueEntry(null, null, future);
// tracks scheduled tasks alive
this.scheduled = new ArrayList<>();
tail = empty;
// clean up to ensure we do not keep references to old tasks
cleaner = this.scheduleWithFixedDelay(() -> {
synchronized (this) {
scheduled.removeIf((sf) -> sf.isCancelled());
if (tail == null) {
// the service is shutdown
return;
}
WorkQueueEntry entry = tail;
while (entry.prev != null) {
if (entry.prev.future.isDone()) {
entry.prev = null;
break;
}
entry = entry.prev;
}
if (tail != empty && tail.future.isDone()) {
// replace the tail with empty to ensure we do not prevent GC
tail = empty;
}
}
}, 2, 4, TimeUnit.SECONDS);
}
@Override
public ScheduledFuture<?> schedule(@Nullable Runnable command, long delay, @Nullable TimeUnit unit) {
return schedule((origin) -> pool.schedule(() -> {
// we block the thread here, in worst case new threads are spawned
submitToWorkQueue(origin.join(), command).join();
}, delay, unit));
}
@Override
public <V> ScheduledFuture<V> schedule(@Nullable Callable<V> callable, long delay, @Nullable TimeUnit unit) {
return schedule((origin) -> pool.schedule(() -> {
// we block the thread here, in worst case new threads are spawned
return submitToWorkQueue(origin.join(), callable).join();
}, delay, unit));
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(@Nullable Runnable command, long initialDelay, long period,
@Nullable TimeUnit unit) {
return schedule((origin) -> pool.scheduleAtFixedRate(() -> {
CompletableFuture<?> submitted;
try {
// we block the thread here, in worst case new threads are spawned
submitted = submitToWorkQueue(origin.join(), command);
} catch (RejectedExecutionException ex) {
// the pool has been shutdown, scheduled tasks should cancel
return;
}
submitted.join();
}, initialDelay, period, unit));
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(@Nullable Runnable command, long initialDelay, long delay,
@Nullable TimeUnit unit) {
return schedule((origin) -> pool.scheduleWithFixedDelay(() -> {
CompletableFuture<?> submitted;
try {
// we block the thread here, in worst case new threads are spawned
submitted = submitToWorkQueue(origin.join(), command);
} catch (RejectedExecutionException ex) {
// the pool has been shutdown, scheduled tasks should cancel
return;
}
submitted.join();
}, initialDelay, delay, unit));
}
private <V> ScheduledFuture<V> schedule(
Function<CompletableFuture<RunnableFuture<?>>, ScheduledFuture<V>> doSchedule) {
synchronized (this) {
if (tail == null) {
throw new RejectedExecutionException("this scheduled executor has been shutdown before");
}
CompletableFuture<RunnableFuture<?>> origin = new CompletableFuture<>();
ScheduledFuture<V> future = doSchedule.apply(origin);
scheduled.add((RunnableFuture<?>) future);
origin.complete((RunnableFuture<?>) future);
return future;
}
}
@Override
public void shutdown() {
synchronized (this) {
cleaner.cancel(false);
scheduled.removeIf((sf) -> {
sf.cancel(false);
return true;
});
tail = null;
}
}
@Override
@NonNullByDefault({})
public List<Runnable> shutdownNow() {
synchronized (this) {
if (tail == null) {
return List.of();
}
// ensures we do not leak the internal cleaner as Runnable
cleaner.cancel(false);
Set<@Nullable Runnable> runnables = Collections.newSetFromMap(new IdentityHashMap<>());
WorkQueueEntry entry = tail;
scheduled.removeIf((sf) -> {
if (sf.cancel(false)) {
runnables.add(sf);
}
return true;
});
tail = null;
while (entry != null) {
if (!entry.future.cancel(false)) {
break;
}
if (entry.origin != null) {
// entry has been submitted by a .schedule call
runnables.add(entry.origin);
} else {
// entry has been submitted by a .submit call
runnables.add(entry.future);
}
entry = entry.prev;
}
return List.copyOf(runnables);
}
}
@Override
public boolean isShutdown() {
synchronized (this) {
return pool == null;
}
}
@Override
public boolean isTerminated() {
synchronized (this) {
return pool == null && tail.future.isDone();
}
}
@Override
public boolean awaitTermination(long timeout, @Nullable TimeUnit unit) throws InterruptedException {
long timeoutAt = System.currentTimeMillis() + unit.toMillis(timeout);
while (!isTerminated()) {
if (System.currentTimeMillis() > timeoutAt) {
return false;
}
Thread.onSpinWait();
}
return true;
}
@Override
public <T> Future<T> submit(@Nullable Callable<T> task) {
return submitToWorkQueue(null, task);
}
private CompletableFuture<?> submitToWorkQueue(RunnableFuture<?> origin, @Nullable Runnable task) {
Callable<?> callable = () -> {
task.run();
return null;
};
return submitToWorkQueue(origin, callable);
}
private <T> CompletableFuture<T> submitToWorkQueue(@Nullable RunnableFuture<?> origin, @Nullable Callable<T> task) {
BiFunction<? super Object, Throwable, T> action = (result, error) -> {
// ignore result & error, they are from the previous task
try {
return task.call();
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
// a small hack to throw the Exception unchecked
throw PoolBasedSequentialScheduledExecutorService.unchecked(ex);
}
};
synchronized (this) {
if (tail == null) {
throw new RejectedExecutionException("this scheduled executor has been shutdown before");
}
RunnableCompletableFuture<T> cf = tail.future.handleAsync(action, pool);
cf.setCallable(task);
tail = new WorkQueueEntry(tail, origin, cf);
return cf;
}
}
private static <E extends RuntimeException> E unchecked(Exception ex) throws E {
throw (E) ex;
}
@Override
public <T> Future<T> submit(@Nullable Runnable task, T result) {
return submitToWorkQueue(null, () -> {
task.run();
return result;
});
}
@Override
public Future<?> submit(@Nullable Runnable task) {
return submit(task, (Void) null);
}
@Override
@NonNullByDefault({})
public <T> List<Future<T>> invokeAll(@Nullable Collection<? extends @Nullable Callable<T>> tasks)
throws InterruptedException {
List<Future<T>> futures = new ArrayList<>();
for (Callable<T> task : tasks) {
futures.add(submit(task));
}
// wait for all futures to complete
for (Future<T> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
// ignore, we are just waiting here for the futures to complete
}
}
return futures;
}
@Override
@NonNullByDefault({})
public <T> List<Future<T>> invokeAll(@Nullable Collection<? extends @Nullable Callable<T>> tasks, long timeout,
TimeUnit unit) throws InterruptedException {
List<Future<T>> futures = new ArrayList<>();
for (Callable<T> task : tasks) {
futures.add(submitToWorkQueue(null, task).orTimeout(timeout, unit));
}
// wait for all futures to complete
for (Future<T> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
// ignore, we are just waiting here for the futures to complete
}
}
return futures;
}
@Override
public <T> T invokeAny(@Nullable Collection<? extends @Nullable Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return invokeAny(tasks, Long.MAX_VALUE);
} catch (TimeoutException ex) {
throw new AssertionError(ex);
}
}
@Override
public <T> T invokeAny(@Nullable Collection<? extends @Nullable Callable<T>> tasks, long timeout,
@Nullable TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long timeoutAt = System.currentTimeMillis() + unit.toMillis(timeout);
return invokeAny(tasks, timeoutAt);
}
private <T> T invokeAny(@Nullable Collection<? extends @Nullable Callable<T>> tasks, long timeoutAt)
throws InterruptedException, ExecutionException, TimeoutException {
List<CompletableFuture<T>> futures = new ArrayList<>();
for (Callable<T> task : tasks) {
futures.add(submitToWorkQueue(null, task));
}
// wait for any future to complete
while (timeoutAt >= System.currentTimeMillis()) {
boolean allDone = true;
for (CompletableFuture<T> future : futures) {
if (future.isDone()) {
if (!future.isCompletedExceptionally()) {
// stop the others
for (CompletableFuture<T> tooLate : futures) {
if (tooLate != future) {
tooLate.cancel(true);
}
}
return future.join();
}
} else {
allDone = false;
}
}
if (allDone) {
ExecutionException exe = new ExecutionException("all tasks failed", null);
for (CompletableFuture<T> future : futures) {
try {
future.get();
throw new AssertionError("all tasks should be failed");
} catch (ExecutionException ex) {
exe.addSuppressed(ex);
}
}
throw exe;
}
Thread.onSpinWait();
}
for (CompletableFuture<T> tooLate : futures) {
tooLate.cancel(true);
}
throw new TimeoutException("none of the tasks did complete in time");
}
@Override
public void execute(Runnable command) {
submit(command);
}
static class WorkQueueEntry {
private @Nullable WorkQueueEntry prev;
private @Nullable RunnableFuture<?> origin;
private final RunnableCompletableFuture<?> future;
public WorkQueueEntry(@Nullable WorkQueueEntry prev, @Nullable RunnableFuture<?> origin,
RunnableCompletableFuture<?> future) {
this.prev = prev;
this.origin = origin;
this.future = future;
}
}
static class RunnableCompletableFuture<V> extends CompletableFuture<V> implements RunnableFuture<V> {
private @Nullable Callable<V> callable;
public RunnableCompletableFuture() {
callable = null;
}
public void setCallable(@Nullable Callable<V> callable) {
this.callable = callable;
}
@Override
public <U> RunnableCompletableFuture<U> newIncompleteFuture() {
return new RunnableCompletableFuture<>();
}
@Override
public <U> RunnableCompletableFuture<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn,
Executor executor) {
return (RunnableCompletableFuture<U>) super.handleAsync(fn, executor);
}
@Override
public void run() {
if (this.isDone()) {
// a FutureTask does also return here without exception
return;
}
try {
this.complete(callable.call());
} catch (Error | Exception t) {
this.completeExceptionally(t);
}
}
}
}

View File

@ -22,6 +22,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -115,6 +116,27 @@ public class ThreadPoolManager {
}
}
/**
* Returns an instance of a scheduled service, which will sequentially execute submitted tasks. If a task is
* currently running the task is queued until the previous one is completed, this also applies for scheduled tasks.
* The service might execute submitted task might in different threads, but still one after the other.
* If it is the first request for the given pool name and a pool is used, the instance is newly created.
*
* @param poolName a short name used to identify the pool, if a thread pool is used e.g. "bluetooth-discovery"
* @param threadName a short name used to identify the thread if no thread pool is used, e.g. "bluetooth"
* @return an instance to use
*/
public static ScheduledExecutorService getPoolBasedSequentialScheduledExecutorService(String poolName,
String threadName) {
if (configs.getOrDefault(poolName, 0) > 0) {
ScheduledThreadPoolExecutor pool = getScheduledPoolUnwrapped(poolName);
return new PoolBasedSequentialScheduledExecutorService((ScheduledThreadPoolExecutor) pool);
} else {
return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(threadName));
}
}
/**
* Returns an instance of a scheduled thread pool service. If it is the first request for the given pool name, the
* instance is newly created.

View File

@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.common.NamedThreadFactory;
import org.openhab.core.common.ThreadPoolManager;
import org.openhab.core.events.Event;
import org.openhab.core.events.EventFactory;
import org.openhab.core.events.EventFilter;
@ -69,9 +70,8 @@ public class EventHandler implements AutoCloseable {
}
private synchronized ExecutorRecord createExecutorRecord(Class<? extends EventSubscriber> subscriber) {
return new ExecutorRecord(
Executors.newSingleThreadExecutor(new NamedThreadFactory("eventexecutor-" + executors.size())),
new AtomicInteger());
return new ExecutorRecord(ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("events",
"eventexecutor-" + executors.size()), new AtomicInteger());
}
@Override

View File

@ -17,13 +17,20 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
@ -35,6 +42,138 @@ import org.junit.jupiter.api.Test;
@NonNullByDefault
public class ThreadPoolManagerTest {
@BeforeAll
public static void enableSequentialScheduledExecutorService() {
ThreadPoolManager manager = new ThreadPoolManager();
manager.activate(Map.of("unit-test", "10"));
}
@Test
public void testSequentialExecutorServiceAssumptions() {
Callable<Boolean> callable = () -> true;
Runnable runnable = () -> {
};
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
assertTrue(service.submit(runnable) instanceof RunnableFuture);
assertTrue(service.submit(callable) instanceof RunnableFuture);
assertTrue(service.schedule(runnable, 1, TimeUnit.SECONDS) instanceof RunnableFuture);
assertTrue(service.schedule(callable, 1, TimeUnit.SECONDS) instanceof RunnableFuture);
var fixedRate = service.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);
try {
assertTrue(fixedRate instanceof RunnableFuture);
} finally {
fixedRate.cancel(false);
}
var fixedDelay = service.scheduleWithFixedDelay(runnable, 1, 1, TimeUnit.SECONDS);
try {
assertTrue(fixedDelay instanceof RunnableFuture);
} finally {
fixedDelay.cancel(false);
}
service.shutdownNow();
}
@Test
public void testExecutionIsSequentialInSequentialExecutorService() {
ScheduledExecutorService service = ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("unit-test",
"thread-1");
AtomicBoolean done = new AtomicBoolean(false);
service.submit(() -> {
Thread.sleep(100);
done.set(true);
return null;
});
assertTrue(((CompletableFuture<Boolean>) service.submit(() -> done.get())).join());
}
@Test
public void testCancelDoesNotStopProcessingInSequentialExecutorService() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ScheduledExecutorService service = ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("unit-test",
"thread-2");
service.submit(() -> {
Thread.sleep(100);
return null;
});
service.submit(() -> null).cancel(false);
service.submit(() -> {
latch.countDown();
});
assertTrue(latch.await(1, TimeUnit.SECONDS));
}
@Test
public void testInstancesDoNotBlockEachOtherInSequentialExecutorService() throws InterruptedException {
ScheduledExecutorService serviceA = ThreadPoolManager
.getPoolBasedSequentialScheduledExecutorService("unit-test", "thread-3");
ScheduledExecutorService serviceB = ThreadPoolManager
.getPoolBasedSequentialScheduledExecutorService("unit-test", "thread-4");
serviceA.submit(() -> {
Thread.sleep(100);
return null;
});
CountDownLatch latch = new CountDownLatch(1);
serviceB.submit(() -> {
latch.countDown();
});
assertTrue(latch.await(800, TimeUnit.MILLISECONDS));
}
@Test
public void testSchedulingWorksInSequentialExecutorService() throws InterruptedException {
ScheduledExecutorService service = ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("unit-test",
"thread-5");
CountDownLatch latch = new CountDownLatch(1);
service.schedule(() -> {
latch.countDown();
}, 200, TimeUnit.MILLISECONDS);
assertTrue(latch.await(1, TimeUnit.SECONDS));
}
@Test
public void testSchedulingGetsBlockedByRegularTaskInSequentialExecutorService() throws InterruptedException {
ScheduledExecutorService service = ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("unit-test",
"thread-6");
CountDownLatch latch = new CountDownLatch(1);
service.submit(() -> {
Thread.sleep(200);
return null;
});
service.schedule(() -> {
latch.countDown();
}, 20, TimeUnit.MILLISECONDS);
assertFalse(latch.await(100, TimeUnit.MILLISECONDS));
}
@Test
public void testGetScheduledPool() {
ThreadPoolExecutor result = ThreadPoolManager.getScheduledPoolUnwrapped("test1");
@ -136,4 +275,10 @@ public class ThreadPoolManagerTest {
assertTrue(cdl.await(5, TimeUnit.SECONDS), "Checking if thread pool " + poolName + " works");
assertFalse(threadPool.isShutdown(), "Checking if thread pool is not shut down");
}
@AfterAll
public static void disableSequentialScheduledExecutorService() {
ThreadPoolManager manager = new ThreadPoolManager();
manager.activate(Map.of("unit-test", "0"));
}
}

View File

@ -15,3 +15,4 @@ org.openhab.core.automation.internal.RuleEngineImpl=AvoidCatchingThrowable
org.openhab.core.automation.internal.RuleRegistryImpl=CompareObjectsWithEquals
org.openhab.core.automation.internal.provider.AutomationResourceBundlesEventQueue=AvoidCatchingThrowable
org.openhab.core.io.console.karaf.internal.InstallServiceCommand=SystemPrintln
org.openhab.core.common.PoolBasedSequentialScheduledExecutorService=CompareObjectsWithEquals