我有一个使用RxAndroidBle作为BLE解决方案的Android应用程序,这真的很棒,节省了大量的工作时间。
但是最近我必须实现更新固件功能,但我遇到了困难。
其中一个自定义BLE设备在写入之前必须等待通知ByteAray。每 16 个包(每个包 20 字节(,设备将向某些特征发送通知 ByteAray。
所以我要做的是等待通知,然后发送这些固件包。而且我发现我必须添加一个 160ms 的计时器,这样设备就不会被这些包(背压?
还是没有运气。设备将无响应,然后在一定数量的数据后断开连接,例如 256 字节 * 12(文件大小范围从 256 字节 * 330 ~ 785(。
这是当前的实现:
.flatMap { ifFrameCountAccepted ->
if (ifFrameCountAccepted) {
Timber.d("Wait 2 seconds cleaning up flash")
Flowable
.timer(3000, TimeUnit.MILLISECONDS)
.flatMap { sendFramesFlowable(firmware, isPic) }
} else {
Flowable.error(RuntimeException("MCU L2 frame count error."))
}
}
.toObservable()
.flatMap { isFirmwareTransmissionDone ->
if (isFirmwareTransmissionDone) {
waitUntilL2McuUpgradeFinish(isPic)
} else {
Observable.just(false)
}
}
private fun sendFramesFlowable(
firmware: FirmwareUpgradeData,
isPic: Boolean
): Flowable<Boolean> {
Timber.d("Firmware size: ${firmware.processed.size}")
val frameInvalidPublishSubject = PublishSubject.create<Boolean>()
val frameObservable = connection.toFlowable(BackpressureStrategy.BUFFER).flatMap { rxConnection ->
rxConnection.setupNotification(
RC_NOTIFICATION_CHARACTERISTIC,
NotificationSetupMode.DEFAULT
).toFlowable(BackpressureStrategy.BUFFER)
.concatMap { notification ->
notification.toFlowable(BackpressureStrategy.BUFFER)
}
}
.filter { it.size == 20 }
.filter { it.second.first().toUnsignedValue() == COMMAND_HEADER_L2_FRAME }
.map { reply ->
Timber.d("[A2] decoded: ${reply.second.toHex()}")
val payload = reply.second.dataPayload(reply.first)
val isValid = upgradeDataTransmission.resolveFrameCommand(payload)
Timber.d("MCU L2 upgrade frame is accepted: $isValid")
unless(!isValid) {
frameInvalidPublishSubject.onNext(true)
}
isValid
}
.zipWith(Flowable.range(1, firmware.frameCount), BiFunction { _: Boolean, frameCount: Int ->
frameCount
})
.doOnNext { frameCount ->
val base = if (isPic) 45.minus(25) else 99.minus(45)
val progress = base.div(firmware.frameCount.toFloat())
.times(frameCount).toInt()
.plus(if (isPic) 25 else 45)
_upgradeProgress.postValue(Event.success(progress))
}
.flatMap { frameCount ->
Timber.d("frame count now is:$frameCount")
if (frameCount == firmware.frameCount) {
triggerL2Upgrade(firmware.crcCheck)
} else {
Flowable.just(false)
}
}
Flowable.fromIterable(firmware.processed.withIndex())
.buffer(16)
.takeUntil(frameInvalidPublishSubject.toFlowable(BackpressureStrategy.BUFFER))
.concatMap { frame ->
Flowable.timer(160, TimeUnit.MILLISECONDS).concatMap {
Flowable.fromIterable(frame)
.concatMap { perPackage ->
connection.toFlowable(BackpressureStrategy.BUFFER)
.concatMap {
it.writeCharacteristic(RC_WRITE_CHARACTERISTIC, perPackage.value.toByteArray())
.toFlowable()
}
}
}
}
.forEachWhile {
true
}
return frameObservable
}
对于这种情况,RxAndroidBle
中有一个内置的帮助程序:RxBleConnection.createLongWriteBuilder()
您可以阅读它为您提供的确切可能性,但使用writeOperationAckStrategy()
您可以推迟写入下一个字节数组:
val notificationsEvery16thWrite: Observable<ByteArray> = (...)
rxBleConnection.createNewLongWriteBuilder()
.setWriteOperationAckStrategy { writeAcks ->
val emitEveryWriteAckApartEvery16th = writeAcks
.scan(0 to null as Boolean?) { acc, boolean ->
acc.first.plus(1) to boolean
}
.filter { it.first != 0 && it.first % 16 != 0 }
.map { it.second!! }
val emitEvery16thWriteAckAfterNotification = notificationsEvery16thWrite.zipWith(
writeAcks.buffer(16), // buffer 16 writes ACKs
BiFunction { _, writesCompleted -> writesCompleted.last() }) // use last when notification arrives
Observable.merge(
emitEveryWriteAckApartEvery16th,
emitEvery16thWriteAckAfterNotification
)
}