From 32237a9bdc861e94554fc3c56ec8be02490744eb Mon Sep 17 00:00:00 2001 From: joerg1985 <16140691+joerg1985@users.noreply.github.com> Date: Fri, 24 Nov 2023 22:24:37 +0100 Subject: [PATCH] Do not leak running pools from the internal collection (#3885) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörg Sautter --- .../core/common/ThreadPoolManager.java | 59 ++++++++----------- 1 file changed, 23 insertions(+), 36 deletions(-) diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java index 42e3076f6..69c51d298 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.WeakHashMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -74,7 +73,7 @@ public class ThreadPoolManager { protected static final long THREAD_TIMEOUT = 65L; protected static final long THREAD_MONITOR_SLEEP = 60000; - protected static Map pools = new WeakHashMap<>(); + protected static Map pools = new ConcurrentHashMap<>(); private static Map configs = new ConcurrentHashMap<>(); @@ -124,23 +123,17 @@ public class ThreadPoolManager { * @return an instance to use */ public static ScheduledExecutorService getScheduledPool(String poolName) { - ExecutorService pool = pools.get(poolName); - if (pool == null) { - synchronized (pools) { - // do a double check if it is still null or if another thread might have created it meanwhile - pool = pools.get(poolName); - if (pool == null) { - int cfg = getConfig(poolName); - pool = new WrappedScheduledExecutorService(cfg, - new NamedThreadFactory(poolName, true, Thread.NORM_PRIORITY)); - ((ThreadPoolExecutor) pool).setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS); - ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); - ((ScheduledThreadPoolExecutor) pool).setRemoveOnCancelPolicy(true); - pools.put(poolName, pool); - LOGGER.debug("Created scheduled thread pool '{}' of size {}", poolName, cfg); - } - } - } + ExecutorService pool = pools.computeIfAbsent(poolName, (name) -> { + int cfg = getConfig(name); + ScheduledThreadPoolExecutor executor = new WrappedScheduledExecutorService(cfg, + new NamedThreadFactory(name, true, Thread.NORM_PRIORITY)); + executor.setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(true); + executor.setRemoveOnCancelPolicy(true); + LOGGER.debug("Created scheduled thread pool '{}' of size {}", name, cfg); + return executor; + }); + if (pool instanceof ScheduledExecutorService service) { return new UnstoppableScheduledExecutorService(poolName, service); } else { @@ -156,21 +149,15 @@ public class ThreadPoolManager { * @return an instance to use */ public static ExecutorService getPool(String poolName) { - ExecutorService pool = pools.get(poolName); - if (pool == null) { - synchronized (pools) { - // do a double check if it is still null or if another thread might have created it meanwhile - pool = pools.get(poolName); - if (pool == null) { - int cfg = getConfig(poolName); - pool = QueueingThreadPoolExecutor.createInstance(poolName, cfg); - ((ThreadPoolExecutor) pool).setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS); - ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); - pools.put(poolName, pool); - LOGGER.debug("Created thread pool '{}' with size {}", poolName, cfg); - } - } - } + ExecutorService pool = pools.computeIfAbsent(poolName, (name) -> { + int cfg = getConfig(name); + ThreadPoolExecutor executor = QueueingThreadPoolExecutor.createInstance(name, cfg); + executor.setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(true); + LOGGER.debug("Created thread pool '{}' with size {}", name, cfg); + return executor; + }); + return new UnstoppableExecutorService<>(poolName, pool); } @@ -179,9 +166,9 @@ public class ThreadPoolManager { return (ThreadPoolExecutor) ret.getDelegate(); } - static ThreadPoolExecutor getScheduledPoolUnwrapped(String poolName) { + static ScheduledThreadPoolExecutor getScheduledPoolUnwrapped(String poolName) { UnstoppableExecutorService ret = (UnstoppableScheduledExecutorService) getScheduledPool(poolName); - return (ThreadPoolExecutor) ret.getDelegate(); + return (ScheduledThreadPoolExecutor) ret.getDelegate(); } protected static int getConfig(String poolName) {