mirror of
https://github.com/danieldemus/openhab-core.git
synced 2025-01-10 21:31:53 +01:00
Do not leak running pools from the internal collection (#3885)
Signed-off-by: Jörg Sautter <joerg.sautter@gmx.net>
This commit is contained in:
parent
f71ebfb83c
commit
32237a9bdc
@ -18,7 +18,6 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.WeakHashMap;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
@ -74,7 +73,7 @@ public class ThreadPoolManager {
|
|||||||
protected static final long THREAD_TIMEOUT = 65L;
|
protected static final long THREAD_TIMEOUT = 65L;
|
||||||
protected static final long THREAD_MONITOR_SLEEP = 60000;
|
protected static final long THREAD_MONITOR_SLEEP = 60000;
|
||||||
|
|
||||||
protected static Map<String, ExecutorService> pools = new WeakHashMap<>();
|
protected static Map<String, ExecutorService> pools = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private static Map<String, Integer> configs = new ConcurrentHashMap<>();
|
private static Map<String, Integer> configs = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@ -124,23 +123,17 @@ public class ThreadPoolManager {
|
|||||||
* @return an instance to use
|
* @return an instance to use
|
||||||
*/
|
*/
|
||||||
public static ScheduledExecutorService getScheduledPool(String poolName) {
|
public static ScheduledExecutorService getScheduledPool(String poolName) {
|
||||||
ExecutorService pool = pools.get(poolName);
|
ExecutorService pool = pools.computeIfAbsent(poolName, (name) -> {
|
||||||
if (pool == null) {
|
int cfg = getConfig(name);
|
||||||
synchronized (pools) {
|
ScheduledThreadPoolExecutor executor = new WrappedScheduledExecutorService(cfg,
|
||||||
// do a double check if it is still null or if another thread might have created it meanwhile
|
new NamedThreadFactory(name, true, Thread.NORM_PRIORITY));
|
||||||
pool = pools.get(poolName);
|
executor.setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS);
|
||||||
if (pool == null) {
|
executor.allowCoreThreadTimeOut(true);
|
||||||
int cfg = getConfig(poolName);
|
executor.setRemoveOnCancelPolicy(true);
|
||||||
pool = new WrappedScheduledExecutorService(cfg,
|
LOGGER.debug("Created scheduled thread pool '{}' of size {}", name, cfg);
|
||||||
new NamedThreadFactory(poolName, true, Thread.NORM_PRIORITY));
|
return executor;
|
||||||
((ThreadPoolExecutor) pool).setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS);
|
});
|
||||||
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
|
|
||||||
((ScheduledThreadPoolExecutor) pool).setRemoveOnCancelPolicy(true);
|
|
||||||
pools.put(poolName, pool);
|
|
||||||
LOGGER.debug("Created scheduled thread pool '{}' of size {}", poolName, cfg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (pool instanceof ScheduledExecutorService service) {
|
if (pool instanceof ScheduledExecutorService service) {
|
||||||
return new UnstoppableScheduledExecutorService(poolName, service);
|
return new UnstoppableScheduledExecutorService(poolName, service);
|
||||||
} else {
|
} else {
|
||||||
@ -156,21 +149,15 @@ public class ThreadPoolManager {
|
|||||||
* @return an instance to use
|
* @return an instance to use
|
||||||
*/
|
*/
|
||||||
public static ExecutorService getPool(String poolName) {
|
public static ExecutorService getPool(String poolName) {
|
||||||
ExecutorService pool = pools.get(poolName);
|
ExecutorService pool = pools.computeIfAbsent(poolName, (name) -> {
|
||||||
if (pool == null) {
|
int cfg = getConfig(name);
|
||||||
synchronized (pools) {
|
ThreadPoolExecutor executor = QueueingThreadPoolExecutor.createInstance(name, cfg);
|
||||||
// do a double check if it is still null or if another thread might have created it meanwhile
|
executor.setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS);
|
||||||
pool = pools.get(poolName);
|
executor.allowCoreThreadTimeOut(true);
|
||||||
if (pool == null) {
|
LOGGER.debug("Created thread pool '{}' with size {}", name, cfg);
|
||||||
int cfg = getConfig(poolName);
|
return executor;
|
||||||
pool = QueueingThreadPoolExecutor.createInstance(poolName, cfg);
|
});
|
||||||
((ThreadPoolExecutor) pool).setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS);
|
|
||||||
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
|
|
||||||
pools.put(poolName, pool);
|
|
||||||
LOGGER.debug("Created thread pool '{}' with size {}", poolName, cfg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return new UnstoppableExecutorService<>(poolName, pool);
|
return new UnstoppableExecutorService<>(poolName, pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -179,9 +166,9 @@ public class ThreadPoolManager {
|
|||||||
return (ThreadPoolExecutor) ret.getDelegate();
|
return (ThreadPoolExecutor) ret.getDelegate();
|
||||||
}
|
}
|
||||||
|
|
||||||
static ThreadPoolExecutor getScheduledPoolUnwrapped(String poolName) {
|
static ScheduledThreadPoolExecutor getScheduledPoolUnwrapped(String poolName) {
|
||||||
UnstoppableExecutorService<?> ret = (UnstoppableScheduledExecutorService) getScheduledPool(poolName);
|
UnstoppableExecutorService<?> ret = (UnstoppableScheduledExecutorService) getScheduledPool(poolName);
|
||||||
return (ThreadPoolExecutor) ret.getDelegate();
|
return (ScheduledThreadPoolExecutor) ret.getDelegate();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static int getConfig(String poolName) {
|
protected static int getConfig(String poolName) {
|
||||||
|
Loading…
Reference in New Issue
Block a user