BtBRQueue: use Handler(Thread) for sending messages and connecting socket

This commit is contained in:
MrYoranimo 2024-01-15 15:11:56 +01:00 committed by José Rebelo
parent ae97e961b9
commit 1185699c56

View File

@ -21,6 +21,12 @@ import android.bluetooth.BluetoothAdapter;
import android.bluetooth.BluetoothDevice;
import android.bluetooth.BluetoothSocket;
import android.content.Context;
import android.os.Handler;
import android.os.HandlerThread;
import android.os.Message;
import android.os.Process;
import androidx.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,16 +35,16 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import nodomain.freeyourgadget.gadgetbridge.impl.GBDevice;
import nodomain.freeyourgadget.gadgetbridge.service.DeviceSupport;
import nodomain.freeyourgadget.gadgetbridge.util.GB;
public final class BtBRQueue {
private static final Logger LOG = LoggerFactory.getLogger(BtBRQueue.class);
public static final int HANDLER_SUBJECT_CONNECT = 0;
public static final int HANDLER_SUBJECT_PERFORM_TRANSACTION = 1;
private BluetoothAdapter mBtAdapter = null;
private BluetoothSocket mBtSocket = null;
@ -46,57 +52,83 @@ public final class BtBRQueue {
private final SocketCallback mCallback;
private final UUID mService;
private final BlockingQueue<AbstractTransaction> mTransactions = new LinkedBlockingQueue<>();
private volatile boolean mDisposed;
private volatile boolean mCrashed;
private final Context mContext;
private CountDownLatch mConnectionLatch;
private CountDownLatch mAvailableData;
private final int mBufferSize;
private Thread writeThread = new Thread("Write Thread") {
private Handler mWriteHandler;
private final HandlerThread mWriteHandlerThread = new HandlerThread("Write Thread", Process.THREAD_PRIORITY_BACKGROUND) {
@Override
public void run() {
LOG.debug("Started write thread for {} (address {})", mGbDevice.getName(), mGbDevice.getAddress());
while (!mDisposed && !mCrashed) {
try {
AbstractTransaction qTransaction = mTransactions.take();
if (!isConnected()) {
LOG.debug("Not connected, waiting for connection...");
setDeviceConnectionState(GBDevice.State.NOT_CONNECTED);
// wait until the connection succeeds before running the actions
// Note that no automatic connection is performed. This has to be triggered
// on the outside typically by the DeviceSupport. The reason is that
// devices have different kinds of initializations and this class has no
// idea about them.
mConnectionLatch = new CountDownLatch(1);
mConnectionLatch.await();
mConnectionLatch = null;
}
LOG.info("Ready for a new message exchange.");
Transaction transaction = (Transaction)qTransaction;
for (BtBRAction action : transaction.getActions()) {
if (LOG.isDebugEnabled()) {
LOG.debug("About to run action: " + action);
protected void onLooperPrepared() {
LOG.debug("Write handler thread's looper prepared, creating write handler");
mWriteHandler = new Handler(mWriteHandlerThread.getLooper()) {
@SuppressLint("MissingPermission")
@Override
public void handleMessage(@NonNull Message msg) {
switch (msg.what) {
case HANDLER_SUBJECT_CONNECT: {
try {
mBtSocket.connect();
LOG.info("Connected to RFCOMM socket for {}", mGbDevice.getName());
setDeviceConnectionState(GBDevice.State.CONNECTED);
// update thread names to show device names in logs
readThread.setName(String.format(Locale.ENGLISH,
"Read Thread for %s", mGbDevice.getName()));
mWriteHandlerThread.setName(String.format(Locale.ENGLISH,
"Write Thread for %s", mGbDevice.getName()));
// now that connect has been created, start the threads
readThread.start();
onConnectionEstablished();
} catch (IOException e) {
LOG.error("IO exception while establishing socket connection: ", e);
setDeviceConnectionState(GBDevice.State.NOT_CONNECTED);
}
return;
}
if (action.run(mBtSocket)) {
LOG.debug("Action ok: " + action);
} else {
LOG.error("Action returned false: " + action);
break;
case HANDLER_SUBJECT_PERFORM_TRANSACTION: {
try {
if (!isConnected()) {
LOG.debug("Not connected, updating device state to WAITING_FOR_RECONNECT");
setDeviceConnectionState(GBDevice.State.WAITING_FOR_RECONNECT);
return;
}
if (!(msg.obj instanceof Transaction)) {
LOG.error("msg.obj is not an instance of Transaction");
return;
}
Transaction transaction = (Transaction) msg.obj;
for (BtBRAction action : transaction.getActions()) {
if (LOG.isDebugEnabled()) {
LOG.debug("About to run action: {}", action);
}
if (action.run(mBtSocket)) {
LOG.debug("Action ok: {}", action);
} else {
LOG.error("Action returned false, cancelling further actions in transaction: {}", action);
break;
}
}
} catch (Throwable ex) {
LOG.error("IO Write Thread died: " + ex.getMessage(), ex);
}
return;
}
}
} catch (InterruptedException ignored) {
mConnectionLatch = null;
LOG.debug("Thread interrupted");
} catch (Throwable ex) {
LOG.error("IO Write Thread died: " + ex.getMessage(), ex);
mCrashed = true;
mConnectionLatch = null;
LOG.warn("Unhandled write handler message {}", msg.what);
}
}
};
}
};
@ -108,44 +140,17 @@ public final class BtBRQueue {
LOG.debug("Read thread started, entering loop");
while (!mDisposed && !mCrashed) {
while (!mDisposed) {
try {
if (!isConnected()) {
LOG.debug("not connected, waiting for connection...");
// wait until the connection succeeds before running the actions
// Note that no automatic connection is performed. This has to be triggered
// on the outside typically by the DeviceSupport. The reason is that
// devices have different kinds of initializations and this class has no
// idea about them.
mConnectionLatch = new CountDownLatch(1);
mConnectionLatch.await();
mConnectionLatch = null;
}
if (mAvailableData != null) {
if (mBtSocket.getInputStream().available() == 0) {
mAvailableData.countDown();
}
}
nRead = mBtSocket.getInputStream().read(buffer);
// safety measure
if (nRead == -1) {
throw new IOException("End of stream");
}
} catch (InterruptedException ignored) {
LOG.debug("Thread interrupted");
mConnectionLatch = null;
continue;
} catch (Throwable ex) {
if (mAvailableData == null) {
LOG.error("IO read thread died: " + ex.getMessage(), ex);
mCrashed = true;
}
mConnectionLatch = null;
continue;
} catch (IOException ex) {
LOG.error("IO exception while reading message from socket, breaking out of read thread: ", ex);
break;
}
LOG.debug("Received {} bytes: {}", nRead, GB.hexdump(buffer, 0, nRead));
@ -157,7 +162,8 @@ public final class BtBRQueue {
}
}
LOG.debug("Exited read thread loop");
LOG.debug("Exited read thread loop, calling disconnect()");
disconnect();
}
};
@ -168,6 +174,8 @@ public final class BtBRQueue {
mCallback = socketCallback;
mService = supportedService;
mBufferSize = bufferSize;
mWriteHandlerThread.start();
}
/**
@ -196,24 +204,6 @@ public final class BtBRQueue {
try {
BluetoothDevice btDevice = mBtAdapter.getRemoteDevice(mGbDevice.getAddress());
mBtSocket = btDevice.createRfcommSocketToServiceRecord(mService);
LOG.debug("RFCOMM socket created, connecting");
// TODO this call is blocking, which makes this method preferably called from a background thread
mBtSocket.connect();
LOG.info("Connected to RFCOMM socket for {}", mGbDevice.getName());
setDeviceConnectionState(GBDevice.State.CONNECTED);
// update thread names to show device names in logs
readThread.setName(String.format(Locale.ENGLISH,
"Read Thread for %s", mGbDevice.getName()));
writeThread.setName(String.format(Locale.ENGLISH,
"Write Thread for %s", mGbDevice.getName()));
// now that connect has been created, start the threads
readThread.start();
writeThread.start();
} catch (IOException e) {
LOG.error("Unable to connect to RFCOMM endpoint: ", e);
setDeviceConnectionState(originalState);
@ -221,7 +211,8 @@ public final class BtBRQueue {
return false;
}
onConnectionEstablished();
LOG.debug("Socket created, connecting in handler");
mWriteHandler.sendMessageAtFrontOfQueue(mWriteHandler.obtainMessage(HANDLER_SUBJECT_CONNECT));
return true;
}
@ -230,18 +221,15 @@ public final class BtBRQueue {
}
public void disconnect() {
if (mBtSocket != null) {
if (mWriteHandlerThread.isAlive()) {
mWriteHandlerThread.quit();
}
if (mBtSocket != null && mBtSocket.isConnected()) {
try {
mAvailableData = new CountDownLatch(1);
if (!mAvailableData.await(1, TimeUnit.SECONDS)) {
LOG.warn("disconnect(): Latch timeout reached while waiting for incoming data");
}
mAvailableData = null;
mBtSocket.close();
} catch (IOException | InterruptedException e) {
LOG.error(e.getMessage());
} catch (IOException e) {
LOG.error("IO exception while closing socket in disconnect(): ", e);
}
}
}
@ -258,14 +246,15 @@ public final class BtBRQueue {
}
/**
* Adds a transaction to the end of the queue.
* Add a finalized {@link Transaction} to the write handler's queue
*
* @param transaction
* @param transaction The transaction to be run in the handler thread's looper
*/
public void add(Transaction transaction) {
LOG.debug("about to add: " + transaction);
LOG.debug("Adding transaction to looper message queue: {}", transaction);
if (!transaction.isEmpty()) {
mTransactions.add(transaction);
mWriteHandler.obtainMessage(HANDLER_SUBJECT_PERFORM_TRANSACTION, transaction).sendToTarget();
}
}
@ -282,10 +271,10 @@ public final class BtBRQueue {
mDisposed = true;
disconnect();
writeThread.interrupt();
writeThread = null;
readThread.interrupt();
readThread = null;
mTransactions.clear();
if (readThread != null && readThread.isAlive()) {
readThread.interrupt();
readThread = null;
}
}
}