我正在尝试通过rxJava将命令列表发送到设备。这是我的代码:
public void startWriteCommucation(final ArrayList<byte[]> b) {
if (isConnected()){
connectionObservable
.flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call(final RxBleConnection rxBleConnection) {
final List<Observable<byte[]>> list = new ArrayList<>();
for (byte[] bytes: b){
Log.e("Observer", Arrays.toString(bytes));
list.add(rxBleConnection
.writeCharacteristic(BleDevice.characteristicWrite, bytes));
}
return Observable.from(list);
}
})
.concatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
view.setTextStatus("Write success");
Log.e("Subscriber", Arrays.toString(bytes));
}
});
}
}
它有效,然后我单击按钮一次。例如,我的 clikc 方法:
public void onClick(){
ArrayList<byte[]> listCmd = new ArrayList<>();
listCmd.add(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
listCmd.add(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
startWriteCommucation(listCmd);
}
和 LogCat 中的 myLogs:
E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
但是当我使用快速双击按钮时出现问题。然后第一次点击可观察仍然有效,我再次点击再次调用 startWriteCommunication 方法。在此之后,我的日志看起来如此:
E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
主要问题是它们不正常,我的设备工作不正确。你能帮忙找一个扑通吗?
问题是RxAndroidBle库错误(它使响应与请求不匹配)和共享两个有状态的通信流之间的连接(需要按顺序进行两次写入,中间没有任何通信)。
错误:应该写入蓝牙GattFeature的值(byte[])设置得太早了。如果同一特征有两个并行写入器 - 其中一个可以覆盖由于争用条件而由另一个设置的 byte[]。我已经对库进行了修复,该库现在正在进行代码审查,应该在不久的将来应用于 SNAPSHOT 版本。
更改后,输出将如下所示:
D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
可能的解决方案
如果您对触发流两次不感兴趣,如果用户将快速点击按钮两次 - 您可以创建一个可共享的流:
Observable<byte[]> theSharedFlow = rxBleConnection
.writeCharacteristic(uuid, data1)
.flatMap(writtenBytes -> rxBleConnection.writeCharacteristic(uuid, data2))
.share()
多次订阅时只会执行一次,直到完成。在上面的代码片段中,第二个writeCharacteristic()
将被订阅(并排队等待通信),然后第一个将发出写入的字节。
如果应用程序打算在共享连接时按任意时间按顺序发送任意命令集,则应用程序需要确保前一组命令已完成。
我希望我已经回答了你的问题。如果您提供有关用例的更多信息,我将尝试改进我的答案。
此致敬意
编辑:
替代解决方案:
为了保留订单,需要订阅所有可观察量才能到达。可观察的合约是可观察的(如果它是冷的)在订阅之前不会执行。当使用flatMap()
时,一旦第一个可观察量发出,就会订阅第二个可观察量。
要按顺序传输两个写入,它们必须以相同的顺序订阅,因此流可能如下所示:
connectionObservable
.flatMap(rxBleConnection -> {
Observable<byte[]> mergedObservable = null;
for (byte[] bytes : b) {
Log.d("Observer", Arrays.toString(bytes));
final Observable<byte[]> writeObservable = rxBleConnection
.writeCharacteristic(uuid, bytes);
if (mergedObservable == null) {
mergedObservable = writeObservable;
} else {
// merging two Observables to be subscribed at the same time when subscribed
mergedObservable = mergedObservable.mergeWith(writeObservable);
}
}
return mergedObservable;
})
// removed .concatMap()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
bytes -> Log.d("Subscriber", Arrays.toString(bytes)),
throwable -> Log.e("Subscriber", "error", throwable)
);
RxJava显然有更多的方法来实现相同的行为,但这不是这个问题的一部分。