我正在尝试修复现有的React Native库react-native-ble-plx,在现有的Java代码中添加onBackPressureBuffer()。
我知道这很丑陋,但我现在没有时间提交 PR,并且有一个悬而未决的问题可能会解决问题。 我这样做是因为事件发射器以 200Hz 工作。我需要一种安全的方式来缓冲本机端的项目,同时它们在 JavaScript 端以自己的节奏使用。
因此,代码如下所示:
final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call() {
int properties = gattCharacteristic.getProperties();
BluetoothGattDescriptor cccDescriptor = gattCharacteristic
.getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
: NotificationSetupMode.COMPAT;
if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
return connection.setupNotification(gattCharacteristic, setupMode);
}
if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
return connection.setupIndication(gattCharacteristic, setupMode);
}
return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
}
}).onBackpressureBuffer(1000) <---- Here is my modification
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
}).subscribe(new Observer<byte[]>() {
@Override
public void onCompleted() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
@Override
public void onError(Throwable e) {
errorConverter.toError(e).reject(promise);
transactions.removeSubscription(transactionId);
}
@Override
public void onNext(byte[] bytes) {
characteristic.logValue("Notification from", bytes);
WritableArray jsResult = Arguments.createArray();
jsResult.pushNull();
jsResult.pushMap(characteristic.toJSObject(bytes));
jsResult.pushString(transactionId);
sendEvent(Event.ReadEvent, jsResult);
}
});
我的问题是,即使添加了这些内容,我也会遇到MissingBackPressure异常。
我尝试过 onBackPressureDrop(),我有完全相同的行为。所以我认为我做错了,但现在无法弄清楚为什么。
任何帮助表示赞赏。
正如您所说,您面临着react-native
库的问题,并且上面的代码之前确实抛出了MissingBackpressureException
。
来自.onBackpressureDrop()
的 Javadoc(粗体我的):
指示发出项目的速度快于其观察者可以消耗的项目丢弃的可观察量, 而不是发射那些其观察者不准备观察的项目。
如果下游请求计数达到 0,则可观察对象将避免调用 {@code onNext},直到 观察者再次调用 {@code 请求 (n)} 以增加请求计数。
背压:操作员遵循来自下游的
- 背压,并在无界中消耗源 {@code 可观察}
- 方式(即,不对其施加背压)。
- 调度程序:
- {@code onBackpressureDrop} 默认情况下不会在特定 {@link 调度程序}上运行。
可以看到链中的下一个运算符是.flatMap()
、.doOnUnsubscribe()
和.subscribe()
。
来自 Javadoc of.flatMap()
关于背压:
: 操作员尊重来自下游的背压。外部 {@code 可观察} 被消耗 在无界模式下(即,不对其施加背压)。内部 {@code 可观察} 应遵循 背压;如果违反,操作员可能会发出信号 {@code MissingBackpressureException}。
Javadoc.doOnUnsubscribe()
:
背压: {@code doOnUnsubscribe} 不与背压请求或值传递交互;背压 行为保留在其上游和下游之间。
.subscribe()
:
背压: 运算符以无限方式消耗源 {@code 可观察}(即 no 对其施加背压)。
如您所见,下面的运算符都没有.onBackpressure*()
对其施加背压。您需要添加一个在.onBackpressure*()
之后立即执行此操作的运算符。其中之一运营商是.observeOn(Scheduler)
Javadoc.observeOn()
:
背压:此运算符遵循来自下游的背压,并期望它来自源 {@code 可观察}。违反此规定 期望将导致 {@code MissingBackpressureException}。这是最常见的运算符,其中异常 弹出;寻找链上不支持背压的来源, 例如 {@code interval}、{@code timer}、{code PublishSubject} 或 {@code BehaviorSubject} 并应用任何 的 {@code onBackpressureXXX} 运算符,然后再应用 {@code observeOn} 本身。
因此,可行的代码可能如下所示:
final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call() {
int properties = gattCharacteristic.getProperties();
BluetoothGattDescriptor cccDescriptor = gattCharacteristic
.getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
: NotificationSetupMode.COMPAT;
if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
return connection.setupNotification(gattCharacteristic, setupMode);
}
if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
return connection.setupIndication(gattCharacteristic, setupMode);
}
return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
}
})
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
})
.onBackpressureBuffer(1000) // <---- Here is my modification
.observeOn(Schedulers.trampoline()) // <---- an operator that does backpressure the above
.subscribe(new Observer<byte[]>() {
@Override
public void onCompleted() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
@Override
public void onError(Throwable e) {
errorConverter.toError(e).reject(promise);
transactions.removeSubscription(transactionId);
}
@Override
public void onNext(byte[] bytes) {
characteristic.logValue("Notification from", bytes);
WritableArray jsResult = Arguments.createArray();
jsResult.pushNull();
jsResult.pushMap(characteristic.toJSObject(bytes));
jsResult.pushString(transactionId);
sendEvent(Event.ReadEvent, jsResult);
}
});