[insteon] Convert legacy threads to use scheduler service (#17904)

Signed-off-by: jsetton <jeremy.setton@gmail.com>
This commit is contained in:
Jeremy 2024-12-18 14:19:18 -05:00 committed by GitHub
parent 29915c434c
commit a94e4a1c50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 147 additions and 211 deletions

View File

@ -36,8 +36,6 @@ import org.openhab.binding.insteon.internal.device.LegacyDevice.DeviceStatus;
import org.openhab.binding.insteon.internal.device.LegacyDeviceFeature;
import org.openhab.binding.insteon.internal.device.LegacyDeviceType;
import org.openhab.binding.insteon.internal.device.LegacyDeviceTypeLoader;
import org.openhab.binding.insteon.internal.device.LegacyPollManager;
import org.openhab.binding.insteon.internal.device.LegacyRequestManager;
import org.openhab.binding.insteon.internal.device.X10Address;
import org.openhab.binding.insteon.internal.device.database.LegacyModemDBEntry;
import org.openhab.binding.insteon.internal.device.feature.LegacyFeatureListener;
@ -269,7 +267,7 @@ public class InsteonLegacyBinding implements LegacyDriverListener, LegacyPortLis
int ndev = checkIfInModemDatabase(device);
if (device.hasModemDBEntry()) {
device.setStatus(DeviceStatus.POLLING);
LegacyPollManager.instance().startPolling(device, ndev);
driver.getPollManager().startPolling(device, ndev);
}
}
devices.put(address, device);
@ -286,7 +284,7 @@ public class InsteonLegacyBinding implements LegacyDriverListener, LegacyPortLis
}
if (device.getStatus() == DeviceStatus.POLLING) {
LegacyPollManager.instance().stopPolling(device);
driver.getPollManager().stopPolling(device);
}
}
@ -350,8 +348,6 @@ public class InsteonLegacyBinding implements LegacyDriverListener, LegacyPortLis
logger.debug("shutting down Insteon bridge");
driver.stop();
devices.clear();
LegacyRequestManager.destroyInstance();
LegacyPollManager.instance().stop();
isActive = false;
}
@ -427,7 +423,7 @@ public class InsteonLegacyBinding implements LegacyDriverListener, LegacyPortLis
public void logDeviceStatistics() {
String msg = String.format("devices: %3d configured, %3d polling, msgs received: %5d", devices.size(),
LegacyPollManager.instance().getSizeOfQueue(), messagesReceived);
driver.getPollManager().getSizeOfQueue(), messagesReceived);
logger.debug("{}", msg);
messagesReceived = 0;
for (LegacyDevice device : devices.values()) {
@ -485,7 +481,7 @@ public class InsteonLegacyBinding implements LegacyDriverListener, LegacyPortLis
device.setHasModemDBEntry(true);
}
if (device.getStatus() != DeviceStatus.POLLING) {
LegacyPollManager.instance().startPolling(device, dbes.size());
driver.getPollManager().startPolling(device, dbes.size());
}
}
}

View File

@ -263,11 +263,9 @@ public class LegacyDevice {
mrequestQueue.add(qe);
}
}
LegacyRequestManager instance = LegacyRequestManager.instance();
if (instance != null) {
instance.addQueue(this, now + delay);
} else {
logger.warn("request queue manager is null");
LegacyDriver driver = this.driver;
if (driver != null) {
driver.getRequestManager().addQueue(this, now + delay);
}
if (!list.isEmpty()) {
@ -388,11 +386,9 @@ public class LegacyDevice {
msg.setQuietTime(QUIET_TIME_DIRECT_MESSAGE);
}
logger.trace("enqueing direct message with delay {}", delay);
LegacyRequestManager instance = LegacyRequestManager.instance();
if (instance != null) {
instance.addQueue(this, now + delay);
} else {
logger.warn("request queue manger instance is null");
LegacyDriver driver = this.driver;
if (driver != null) {
driver.getRequestManager().addQueue(this, now + delay);
}
}

View File

@ -16,10 +16,12 @@ import java.sql.Date;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.insteon.internal.InsteonBindingConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,16 +47,16 @@ public class LegacyPollManager {
private static final long MIN_MSEC_BETWEEN_POLLS = 2000L;
private final Logger logger = LoggerFactory.getLogger(LegacyPollManager.class);
private static LegacyPollManager poller = new LegacyPollManager(); // for singleton
private @Nullable Thread pollThread = null;
private ScheduledExecutorService scheduler;
private @Nullable ScheduledFuture<?> job;
private TreeSet<PQEntry> pollQueue = new TreeSet<>();
private boolean keepRunning = true;
/**
* Constructor
*/
private LegacyPollManager() {
public LegacyPollManager(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}
/**
@ -104,17 +106,8 @@ public class LegacyPollManager {
* Starts the poller thread
*/
public void start() {
if (pollThread == null) {
pollThread = new Thread(new PollQueueReader());
setParamsAndStart(pollThread);
}
}
private void setParamsAndStart(@Nullable Thread thread) {
if (thread != null) {
thread.setName("OH-binding-" + InsteonBindingConstants.BINDING_ID + "-pollQueueReader");
thread.setDaemon(true);
thread.start();
if (job == null) {
job = scheduler.schedule(new PollQueueReader(), 0, TimeUnit.SECONDS);
}
}
@ -122,21 +115,10 @@ public class LegacyPollManager {
* Stops the poller thread
*/
public void stop() {
logger.debug("stopping poller!");
synchronized (pollQueue) {
pollQueue.clear();
keepRunning = false;
pollQueue.notify();
}
try {
Thread pollThread = this.pollThread;
if (pollThread != null) {
pollThread.join();
this.pollThread = null;
}
keepRunning = true;
} catch (InterruptedException e) {
logger.debug("got interrupted on exit: {}", e.getMessage());
ScheduledFuture<?> job = this.job;
if (job != null) {
job.cancel(true);
this.job = null;
}
}
@ -204,18 +186,17 @@ public class LegacyPollManager {
private class PollQueueReader implements Runnable {
@Override
public void run() {
logger.debug("starting poll thread.");
synchronized (pollQueue) {
while (keepRunning) {
try {
logger.debug("starting poll queue thread");
try {
while (!Thread.interrupted()) {
synchronized (pollQueue) {
readPollQueue();
} catch (InterruptedException e) {
logger.warn("poll queue reader thread interrupted!");
break;
}
}
} catch (InterruptedException e) {
logger.trace("poll queue thread interrupted!");
}
logger.debug("poll thread exiting");
logger.debug("exiting poll queue thread!");
}
/**
@ -225,10 +206,9 @@ public class LegacyPollManager {
* @throws InterruptedException
*/
private void readPollQueue() throws InterruptedException {
while (pollQueue.isEmpty() && keepRunning) {
if (pollQueue.isEmpty()) {
logger.trace("waiting for poll queue to fill");
pollQueue.wait();
}
if (!keepRunning) {
return;
}
// something is in the queue
@ -297,14 +277,4 @@ public class LegacyPollManager {
return device.getAddress().toString() + "/" + String.format("%tc", new Date(expirationTime));
}
}
/**
* Singleton pattern instance() method
*
* @return the poller instance
*/
public static synchronized LegacyPollManager instance() {
poller.start();
return poller;
}
}

View File

@ -16,10 +16,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.insteon.internal.InsteonBindingConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,24 +41,18 @@ import org.slf4j.LoggerFactory;
*/
@NonNullByDefault
public class LegacyRequestManager {
private static @Nullable LegacyRequestManager instance = null;
private final Logger logger = LoggerFactory.getLogger(LegacyRequestManager.class);
private @Nullable Thread queueThread = null;
private ScheduledExecutorService scheduler;
private @Nullable ScheduledFuture<?> job;
private Queue<RequestQueue> requestQueues = new PriorityQueue<>();
private Map<LegacyDevice, RequestQueue> requestQueueHash = new HashMap<>();
private boolean keepRunning = true;
private LegacyRequestManager() {
queueThread = new Thread(new RequestQueueReader());
setParamsAndStart(queueThread);
}
private void setParamsAndStart(@Nullable Thread thread) {
if (thread != null) {
thread.setName("OH-binding-" + InsteonBindingConstants.BINDING_ID + "-requestQueueReader");
thread.setDaemon(true);
thread.start();
}
/**
* Constructor
*/
public LegacyRequestManager(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}
/**
@ -92,25 +88,23 @@ public class LegacyRequestManager {
}
}
/**
* Starts request queue thread
*/
public void start() {
if (job == null) {
job = scheduler.schedule(new RequestQueueReader(), 0, TimeUnit.SECONDS);
}
}
/**
* Stops request queue thread
*/
private void stopThread() {
logger.debug("stopping thread");
Thread queueThread = this.queueThread;
if (queueThread != null) {
synchronized (requestQueues) {
keepRunning = false;
requestQueues.notifyAll();
}
try {
logger.debug("waiting for thread to join");
queueThread.join();
logger.debug("request queue thread exited!");
} catch (InterruptedException e) {
logger.warn("got interrupted waiting for thread exit ", e);
}
this.queueThread = null;
public void stop() {
ScheduledFuture<?> job = this.job;
if (job != null) {
job.cancel(true);
this.job = null;
}
}
@ -118,11 +112,16 @@ public class LegacyRequestManager {
@Override
public void run() {
logger.debug("starting request queue thread");
synchronized (requestQueues) {
while (keepRunning) {
try {
RequestQueue queue;
while (keepRunning && (queue = requestQueues.peek()) != null) {
try {
while (!Thread.interrupted()) {
synchronized (requestQueues) {
if (requestQueues.isEmpty()) {
logger.trace("waiting for request queues to fill");
requestQueues.wait();
continue;
}
RequestQueue queue = requestQueues.peek();
if (queue != null) {
long now = System.currentTimeMillis();
long expTime = queue.getExpirationTime();
LegacyDevice device = queue.getDevice();
@ -150,13 +149,10 @@ public class LegacyRequestManager {
logger.debug("device queue for {} is empty!", device.getAddress());
}
}
logger.trace("waiting for request queues to fill");
requestQueues.wait();
} catch (InterruptedException e) {
logger.warn("request queue thread got interrupted, breaking..", e);
break;
}
}
} catch (InterruptedException e) {
logger.trace("request queue thread interrupted!");
}
logger.debug("exiting request queue thread!");
}
@ -188,19 +184,4 @@ public class LegacyRequestManager {
return (int) (expirationTime - queue.expirationTime);
}
}
public static synchronized @Nullable LegacyRequestManager instance() {
if (instance == null) {
instance = new LegacyRequestManager();
}
return instance;
}
public static synchronized void destroyInstance() {
LegacyRequestManager instance = LegacyRequestManager.instance;
if (instance != null) {
instance.stopThread();
LegacyRequestManager.instance = null;
}
}
}

View File

@ -224,7 +224,7 @@ public class PollManager {
}
}
} catch (InterruptedException e) {
logger.debug("poll queue thread interrupted!");
logger.trace("poll queue thread interrupted!");
}
logger.debug("exiting poll queue thread!");
}

View File

@ -181,7 +181,7 @@ public class RequestManager {
}
}
} catch (InterruptedException e) {
logger.debug("request queue thread interrupted!");
logger.trace("request queue thread interrupted!");
}
logger.debug("exiting request queue thread!");
}

View File

@ -22,6 +22,8 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.insteon.internal.config.InsteonLegacyNetworkConfiguration;
import org.openhab.binding.insteon.internal.device.InsteonAddress;
import org.openhab.binding.insteon.internal.device.LegacyPollManager;
import org.openhab.binding.insteon.internal.device.LegacyRequestManager;
import org.openhab.binding.insteon.internal.device.database.LegacyModemDBEntry;
import org.openhab.binding.insteon.internal.transport.message.Msg;
import org.openhab.core.io.transport.serial.SerialPortManager;
@ -37,6 +39,8 @@ import org.openhab.core.io.transport.serial.SerialPortManager;
public class LegacyDriver {
private LegacyPort port;
private LegacyDriverListener listener;
private LegacyPollManager poller;
private LegacyRequestManager requester;
private Map<InsteonAddress, LegacyModemDBEntry> modemDBEntries = new HashMap<>();
private ReentrantLock modemDBEntriesLock = new ReentrantLock();
@ -45,6 +49,8 @@ public class LegacyDriver {
this.listener = listener;
this.port = new LegacyPort(config, this, serialPortManager, scheduler);
this.poller = new LegacyPollManager(scheduler);
this.requester = new LegacyRequestManager(scheduler);
}
public boolean isReady() {
@ -70,10 +76,14 @@ public class LegacyDriver {
public void start() {
port.start();
poller.start();
requester.start();
}
public void stop() {
port.stop();
poller.stop();
requester.stop();
}
public void writeMessage(Msg m) throws IOException {
@ -105,4 +115,12 @@ public class LegacyDriver {
public void disconnected() {
listener.disconnected();
}
public LegacyPollManager getPollManager() {
return poller;
}
public LegacyRequestManager getRequestManager() {
return requester;
}
}

View File

@ -18,11 +18,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.insteon.internal.InsteonBindingConstants;
import org.openhab.binding.insteon.internal.InsteonLegacyBindingConstants;
import org.openhab.binding.insteon.internal.config.InsteonLegacyNetworkConfiguration;
import org.openhab.binding.insteon.internal.device.InsteonAddress;
@ -74,11 +75,11 @@ public class LegacyPort {
private IOStream ioStream;
private String name;
private Modem modem;
private ScheduledExecutorService scheduler;
private IOStreamReader reader;
private IOStreamWriter writer;
private final int readSize = 1024; // read buffer size
private @Nullable Thread readThread = null;
private @Nullable Thread writeThread = null;
private @Nullable ScheduledFuture<?> readJob;
private @Nullable ScheduledFuture<?> writeJob;
private boolean running = false;
private boolean modemDBComplete = false;
private MsgFactory msgFactory = new MsgFactory();
@ -100,6 +101,7 @@ public class LegacyPort {
SerialPortManager serialPortManager, ScheduledExecutorService scheduler) {
this.name = config.getRedactedPort();
this.driver = driver;
this.scheduler = scheduler;
this.modem = new Modem();
addListener(modem);
this.ioStream = IOStream.create(config.parse(), scheduler, serialPortManager);
@ -178,10 +180,8 @@ public class LegacyPort {
return;
}
readThread = new Thread(reader);
setParamsAndStart(readThread, "OH-binding-" + InsteonBindingConstants.BINDING_ID + "-LegacyReader");
writeThread = new Thread(writer);
setParamsAndStart(writeThread, "OH-binding-" + InsteonBindingConstants.BINDING_ID + "-LegacyWriter");
readJob = scheduler.schedule(reader, 0, TimeUnit.SECONDS);
writeJob = scheduler.schedule(writer, 0, TimeUnit.SECONDS);
if (!mdbb.isComplete()) {
modem.initialize();
@ -192,14 +192,6 @@ public class LegacyPort {
disconnected.set(false);
}
private void setParamsAndStart(@Nullable Thread thread, String type) {
if (thread != null) {
thread.setName("OH-binding-Insteon " + name + " " + type);
thread.setDaemon(true);
thread.start();
}
}
/**
* Stops all threads
*/
@ -219,32 +211,17 @@ public class LegacyPort {
ioStream.close();
}
Thread readThread = this.readThread;
if (readThread != null) {
readThread.interrupt();
ScheduledFuture<?> readJob = this.readJob;
if (readJob != null) {
readJob.cancel(true);
this.readJob = null;
}
Thread writeThread = this.writeThread;
if (writeThread != null) {
writeThread.interrupt();
ScheduledFuture<?> writeJob = this.writeJob;
if (writeJob != null) {
writeJob.cancel(true);
this.writeJob = null;
}
logger.debug("waiting for read thread to exit for port {}", name);
try {
if (readThread != null) {
readThread.join();
}
} catch (InterruptedException e) {
logger.debug("got interrupted waiting for read thread to exit.");
}
logger.debug("waiting for write thread to exit for port {}", name);
try {
if (writeThread != null) {
writeThread.join();
}
} catch (InterruptedException e) {
logger.debug("got interrupted waiting for write thread to exit.");
}
this.readThread = null;
this.writeThread = null;
logger.debug("all threads for port {} stopped.", name);
}
@ -296,6 +273,7 @@ public class LegacyPort {
* @author Bernd Pfrommer - Initial contribution
*/
class IOStreamReader implements Runnable {
private static final int READ_BUFFER_SIZE = 1024;
private ReplyType reply = ReplyType.GOT_ACK;
private Object replyLock = new Object();
@ -311,20 +289,25 @@ public class LegacyPort {
@Override
public void run() {
logger.debug("starting reader...");
byte[] buffer = new byte[readSize];
logger.debug("starting reader thread");
byte[] buffer = new byte[READ_BUFFER_SIZE];
try {
for (int len = -1; (len = ioStream.read(buffer)) > 0;) {
msgFactory.addData(buffer, len);
processMessages();
while (!Thread.interrupted()) {
logger.trace("reader checking for input data");
// this call blocks until input data is available
int len = ioStream.read(buffer);
if (len > 0) {
msgFactory.addData(buffer, len);
processMessages();
}
}
} catch (InterruptedException e) {
logger.debug("reader thread got interrupted!");
logger.trace("reader thread got interrupted!");
} catch (IOException e) {
logger.debug("got an io exception in the reader thread");
logger.trace("reader thread got an io exception", e);
disconnected();
}
logger.debug("reader thread exiting!");
logger.debug("exiting reader thread!");
}
private void processMessages() {
@ -389,28 +372,22 @@ public class LegacyPort {
* Called by IOStreamWriter for flow control.
*
* @return true if retransmission is necessary
* @throws InterruptedException
*/
public boolean waitForReply() {
public boolean waitForReply() throws InterruptedException {
reply = ReplyType.WAITING_FOR_ACK;
while (reply == ReplyType.WAITING_FOR_ACK) {
try {
logger.trace("writer waiting for ack.");
// There have been cases observed, in particular for
// the Hub, where we get no ack or nack back, causing the binding
// to hang in the wait() below, because unsolicited messages
// do not trigger a notify(). For this reason we request retransmission
// if the wait() times out.
getRequestReplyLock().wait(30000); // be patient for 30 msec
if (reply == ReplyType.WAITING_FOR_ACK) { // timeout expired without getting ACK or NACK
logger.trace("writer timeout expired, asking for retransmit!");
reply = ReplyType.GOT_NACK;
break;
} else {
logger.trace("writer got ack: {}", (reply == ReplyType.GOT_ACK));
}
} catch (InterruptedException e) {
break; // done for the day...
}
logger.trace("writer waiting for ack.");
// There have been cases observed, in particular for
// the Hub, where we get no ack or nack back, causing the binding
// to hang in the wait() below, because unsolicited messages
// do not trigger a notify(). For this reason we request retransmission
// if the wait() times out.
getRequestReplyLock().wait(30000); // be patient for 30 msec
if (reply == ReplyType.WAITING_FOR_ACK) { // timeout expired without getting ACK or NACK
logger.trace("writer timeout expired, asking for retransmit!");
reply = ReplyType.GOT_NACK;
} else {
logger.trace("writer got ack: {}", (reply == ReplyType.GOT_ACK));
}
return reply == ReplyType.GOT_NACK;
}
@ -427,9 +404,9 @@ public class LegacyPort {
@Override
public void run() {
logger.debug("starting writer...");
while (true) {
try {
logger.debug("starting writer thread");
try {
while (!Thread.interrupted()) {
// this call blocks until the lock on the queue is released
logger.trace("writer checking message queue");
Msg msg = writeQueue.take();
@ -450,16 +427,14 @@ public class LegacyPort {
if (msg.getQuietTime() > 0) {
Thread.sleep(msg.getQuietTime());
}
} catch (InterruptedException e) {
logger.debug("got interrupted exception in write thread");
break;
} catch (IOException e) {
logger.debug("got an io exception in the write thread");
disconnected();
break;
}
} catch (InterruptedException e) {
logger.trace("writer thread got interrupted!");
} catch (IOException e) {
logger.trace("writer thread got an io exception", e);
disconnected();
}
logger.debug("writer thread exiting!");
logger.debug("exiting writer thread!");
}
}

View File

@ -233,9 +233,9 @@ public class Port {
}
}
} catch (InterruptedException e) {
logger.debug("reader thread got interrupted!");
logger.trace("reader thread got interrupted!");
} catch (IOException e) {
logger.debug("reader thread got an io exception", e);
logger.trace("reader thread got an io exception", e);
disconnected();
}
logger.debug("exiting reader thread!");
@ -333,9 +333,9 @@ public class Port {
Thread.sleep(WRITE_WAIT_TIME);
}
} catch (InterruptedException e) {
logger.debug("writer thread got interrupted!");
logger.trace("writer thread got interrupted!");
} catch (IOException e) {
logger.debug("writer thread got an io exception", e);
logger.trace("writer thread got an io exception", e);
disconnected();
}
logger.debug("exiting writer thread!");