Xiaomi: implement requesting chunks for incomplete transactions

When using characteristics for communication with a Xiaomi protobuf
device, a chunked transaction could be sent out-of-sequence or
incomplete, which would result in either a corrupted payload or the
characteristic to halt because the device would not receive an
acknowledgement.

This changeset implements a map to store the received chunk data and
sequence number, that allows us to reassemble the payload in sequence
after confirming having received all chunks for the transaction. A list
of missing chunks is sent to the device if not all chunks have been
received and 5 seconds have passed since the last chunk was sent.
This commit is contained in:
MrYoranimo 2024-08-23 17:17:10 +02:00 committed by José Rebelo
parent 11ac1ed8bf
commit 44d50e6246
2 changed files with 118 additions and 24 deletions

View File

@ -222,9 +222,9 @@ public class XiaomiBleSupport extends XiaomiConnectionSupport {
} }
@Override @Override
public void disconnect() { public void dispose() {
mXiaomiSupport.onDisconnect(); mXiaomiSupport.onDisconnect();
super.disconnect(); super.dispose();
} }
}; };
@ -329,5 +329,13 @@ public class XiaomiBleSupport extends XiaomiConnectionSupport {
@Override @Override
public void dispose() { public void dispose() {
commsSupport.dispose(); commsSupport.dispose();
if (characteristicCommandRead != null)
characteristicCommandRead.dispose();
if (characteristicCommandWrite != null)
characteristicCommandWrite.dispose();
if (characteristicDataUpload != null)
characteristicDataUpload.dispose();
if (characteristicActivityData != null)
characteristicActivityData.dispose();
} }
} }

View File

@ -18,6 +18,8 @@ package nodomain.freeyourgadget.gadgetbridge.service.devices.xiaomi;
import android.bluetooth.BluetoothGattCharacteristic; import android.bluetooth.BluetoothGattCharacteristic;
import android.os.Handler;
import android.os.Looper;
import androidx.annotation.Nullable; import androidx.annotation.Nullable;
@ -28,17 +30,23 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.UUID; import java.util.UUID;
import nodomain.freeyourgadget.gadgetbridge.service.btle.BLETypeConversions; import nodomain.freeyourgadget.gadgetbridge.service.btle.BLETypeConversions;
import nodomain.freeyourgadget.gadgetbridge.service.btle.BtLEQueue;
import nodomain.freeyourgadget.gadgetbridge.service.btle.TransactionBuilder; import nodomain.freeyourgadget.gadgetbridge.service.btle.TransactionBuilder;
import nodomain.freeyourgadget.gadgetbridge.util.GB; import nodomain.freeyourgadget.gadgetbridge.util.GB;
public class XiaomiCharacteristic { public class XiaomiCharacteristic {
private final Logger LOG = LoggerFactory.getLogger(XiaomiCharacteristic.class); private final Logger LOG = LoggerFactory.getLogger(XiaomiCharacteristic.class);
private static final long TIMEOUT_TASK_DELAY = 5000L;
public static final byte[] PAYLOAD_ACK = new byte[]{0, 0, 3, 0}; public static final byte[] PAYLOAD_ACK = new byte[]{0, 0, 3, 0};
@ -59,8 +67,8 @@ public class XiaomiCharacteristic {
// Chunking // Chunking
private int numChunks = 0; private int numChunks = 0;
private int currentChunk = 0; private final Handler timeoutHandler = new Handler(Looper.getMainLooper());
private final ByteArrayOutputStream chunkBuffer = new ByteArrayOutputStream(); private final Map<Integer, byte[]> receivedChunks = new HashMap<>();
// Scheduling // Scheduling
// TODO timeouts // TODO timeouts
@ -99,13 +107,13 @@ public class XiaomiCharacteristic {
public void reset() { public void reset() {
this.numChunks = 0; this.numChunks = 0;
this.currentChunk = 0;
this.encryptedIndex = 1; // 0 is used by auth service this.encryptedIndex = 1; // 0 is used by auth service
this.chunkBuffer.reset(); this.receivedChunks.clear();
this.payloadQueue.clear(); this.payloadQueue.clear();
this.waitingAck = false; this.waitingAck = false;
this.sendingChunked = false; this.sendingChunked = false;
this.currentPayload = null; this.currentPayload = null;
cancelTimeoutTask();
} }
/** /**
@ -157,37 +165,115 @@ public class XiaomiCharacteristic {
builder.write(bluetoothGattCharacteristic, chunkToSend); builder.write(bluetoothGattCharacteristic, chunkToSend);
} }
public void dispose() {
cancelTimeoutTask();
}
private void requestMissingChunks() {
if (!(numChunks > 0)) {
LOG.warn("Timeout task ran but not expecting any chunks");
return;
}
LOG.debug("Timeout reached while waiting for all chunks from device");
final List<Integer> missingChunks = new ArrayList<>();
for (int i = 0; i < numChunks; i++) {
if (!this.receivedChunks.containsKey(i + 1)) {
missingChunks.add(i + 1);
}
}
// prevent going over maximum message length
int reqChunkCount = Math.min(missingChunks.size(), (maxWriteSize - 4) / 2);
if (reqChunkCount < missingChunks.size()) {
LOG.debug("Missing {} chunk(s), only requesting first {}: {}", missingChunks.size(), reqChunkCount, missingChunks);
} else {
LOG.debug("Missing {} chunk(s): {}", missingChunks.size(), missingChunks);
}
final ByteBuffer bb = ByteBuffer.allocate(4 + reqChunkCount * 2).order(ByteOrder.LITTLE_ENDIAN);
bb.putShort((short) 0); // chunk ID
bb.put((byte) 1); // type CHUNKED_ACK
bb.put((byte) 5); // indicate partially received transmission, followed by missing chunks
for (int i = 0; i < reqChunkCount; i++) {
bb.putShort(missingChunks.get(i).shortValue());
}
final TransactionBuilder tb = mSupport.createTransactionBuilder(String.format("send nack with missing chunks %s", missingChunks));
tb.write(bluetoothGattCharacteristic, bb.array());
tb.queue(mSupport.getQueue());
}
private void cancelTimeoutTask() {
this.timeoutHandler.removeCallbacksAndMessages(null);
}
private void rescheduleTimeoutTask() {
cancelTimeoutTask();
this.timeoutHandler.postDelayed(this::requestMissingChunks, TIMEOUT_TASK_DELAY);
}
private byte[] reconstructPayloadFromChunks() {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
for (int i = 0; i < this.numChunks; i++) {
if (!this.receivedChunks.containsKey(i + 1) || this.receivedChunks.get(i + 1) == null) {
LOG.error("Missing chunk {}", i + 1);
return new byte[0];
}
out.write(this.receivedChunks.get(i + 1));
}
} catch (final IOException ex) {
LOG.error("Failed to reconstruct payload", ex);
return new byte[0];
}
return out.toByteArray();
}
public void onCharacteristicChanged(final byte[] value) { public void onCharacteristicChanged(final byte[] value) {
final ByteBuffer buf = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN); final ByteBuffer buf = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN);
final int chunk = buf.getShort(); final int chunk = buf.getShort();
if (chunk != 0) { if (chunk != 0) {
// Chunked packet // Chunked packet
LOG.debug("Got chunk {} of {}", chunk, numChunks);
if (chunk > numChunks) {
LOG.warn("Ignoring chunk {} exceeding upper bound {}", chunk, numChunks);
return;
}
if (this.receivedChunks.containsKey(chunk)) {
LOG.warn("Already received chunk {}", chunk);
}
final byte[] chunkBytes = new byte[buf.limit() - buf.position()]; final byte[] chunkBytes = new byte[buf.limit() - buf.position()];
buf.get(chunkBytes); buf.get(chunkBytes);
try { this.receivedChunks.put(chunk, chunkBytes);
chunkBuffer.write(chunkBytes); rescheduleTimeoutTask();
} catch (final IOException e) {
throw new RuntimeException(e);
}
currentChunk++;
LOG.debug("Got chunk {} of {}", currentChunk, numChunks);
if (chunk == numChunks) {
sendChunkEndAck();
if (channelHandler != null) { if (this.receivedChunks.keySet().size() == numChunks) {
cancelTimeoutTask();
sendChunkEndAck();
final byte[] payload = reconstructPayloadFromChunks();
if (payload.length == 0) {
LOG.warn("Payload reconstructed from chunks was empty");
} else if (channelHandler != null) {
if (isEncrypted) { if (isEncrypted) {
// chunks are always encrypted if an auth service is available // chunks are always encrypted if an auth service is available
channelHandler.handle(authService.decrypt(chunkBuffer.toByteArray())); channelHandler.handle(authService.decrypt(payload));
} else { } else {
channelHandler.handle(chunkBuffer.toByteArray()); channelHandler.handle(payload);
} }
} else { } else {
LOG.warn("Channel handler for char {} is null!", characteristicUUID); LOG.warn("Channel handler for char {} is null!", characteristicUUID);
} }
currentChunk = 0; this.numChunks = 0;
chunkBuffer.reset(); this.receivedChunks.clear();
} }
} else { } else {
// Not a chunk / single-packet // Not a chunk / single-packet
@ -196,16 +282,16 @@ public class XiaomiCharacteristic {
switch (type) { switch (type) {
case 0: case 0:
// Chunked start request // Chunked start request
// TODO verify previous transfer completed
final byte messageEncrypted = buf.get(); final byte messageEncrypted = buf.get();
byte expectedResult = (byte) (isEncrypted ? 1 : 0); byte expectedResult = (byte) (isEncrypted ? 1 : 0);
if (messageEncrypted != expectedResult) { if (messageEncrypted != expectedResult) {
LOG.warn("Chunked start request: expected {}, got {}", expectedResult, messageEncrypted); LOG.warn("Chunked start request: expected {}, got {}", expectedResult, messageEncrypted);
return; return;
} }
numChunks = buf.getShort(); this.numChunks = buf.getShort();
currentChunk = 0; this.receivedChunks.clear();
chunkBuffer.reset(); LOG.debug("Got chunked start request for {} chunks", this.numChunks);
LOG.debug("Got chunked start request for {} chunks", numChunks);
sendChunkStartAck(); sendChunkStartAck();
return; return;
case 1: case 1: