我应该在 RxJava 订阅链中将 onBackPressureBuffer(n) 放在哪里?



我正在尝试修复现有的React Native库react-native-ble-plx,在现有的Java代码中添加onBackPressureBuffer()。

我知道这很丑陋,但我现在没有时间提交 PR,并且有一个悬而未决的问题可能会解决问题。 我这样做是因为事件发射器以 200Hz 工作。我需要一种安全的方式来缓冲本机端的项目,同时它们在 JavaScript 端以自己的节奏使用。

因此,代码如下所示:

final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call() {
int properties = gattCharacteristic.getProperties();
BluetoothGattDescriptor cccDescriptor = gattCharacteristic
.getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
: NotificationSetupMode.COMPAT;
if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
return connection.setupNotification(gattCharacteristic, setupMode);
}
if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
return connection.setupIndication(gattCharacteristic, setupMode);
}
return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
}
}).onBackpressureBuffer(1000)  <---- Here is my modification
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
}).subscribe(new Observer<byte[]>() {
@Override
public void onCompleted() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
@Override
public void onError(Throwable e) {
errorConverter.toError(e).reject(promise);
transactions.removeSubscription(transactionId);
}
@Override
public void onNext(byte[] bytes) {
characteristic.logValue("Notification from", bytes);
WritableArray jsResult = Arguments.createArray();
jsResult.pushNull();
jsResult.pushMap(characteristic.toJSObject(bytes));
jsResult.pushString(transactionId);
sendEvent(Event.ReadEvent, jsResult);
}
});

我的问题是,即使添加了这些内容,我也会遇到MissingBackPressure异常。

我尝试过 onBackPressureDrop(),我有完全相同的行为。所以我认为我做错了,但现在无法弄清楚为什么。

任何帮助表示赞赏。

正如您所说,您面临着react-native库的问题,并且上面的代码之前确实抛出了MissingBackpressureException

来自.onBackpressureDrop()的 Javadoc(粗体我的):

指示发出项目的速度快于其观察者可以消耗的项目丢弃的可观察量, 而不是发射那些其观察者不准备观察的项目。

如果下游请求计数达到 0,则可观察对象将避免调用 {@code onNext},直到 观察者再次调用 {@code 请求 (n)} 以增加请求计数。

背压
:操作员遵循来自下游的
背压,并在无界中消耗源 {@code 可观察}
方式(即,不对其施加背压)。
调度程序:
{@code onBackpressureDrop} 默认情况下不会在特定 {@link 调度程序}上运行。

可以看到链中的下一个运算符是.flatMap().doOnUnsubscribe().subscribe()

来自 Javadoc of.flatMap()关于背压:

背压

操作员尊重来自下游的背压。外部 {@code 可观察} 被消耗 在无界模式下(即,不对其施加背压)。内部 {@code 可观察} 应遵循 背压;如果违反,操作员可能会发出信号 {@code MissingBackpressureException}。

Javadoc.doOnUnsubscribe()

背压:
{@code doOnUnsubscribe} 不与背压请求或值传递交互;背压 行为保留在其上游和下游之间。

.subscribe()

背压:
运算符以无限方式消耗源 {@code 可观察}(即 no 对其施加背压)。

如您所见,下面的运算符都没有.onBackpressure*()对其施加背压。您需要添加一个在.onBackpressure*()之后立即执行此操作的运算符。其中之一运营商是.observeOn(Scheduler)

Javadoc.observeOn()

背压:此运算符遵循来自下游的背压,并期望它来自源 {@code 可观察}。违反此规定 期望将导致 {@code MissingBackpressureException}。这是最常见的运算符,其中异常 弹出;寻找链上不支持背压的来源, 例如 {@code interval}、{@code timer}、{code PublishSubject} 或 {@code BehaviorSubject} 并应用任何 的 {@code onBackpressureXXX} 运算符,然后再应用 {@code observeOn} 本身。

因此,可行的代码可能如下所示:

final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call() {
int properties = gattCharacteristic.getProperties();
BluetoothGattDescriptor cccDescriptor = gattCharacteristic
.getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
: NotificationSetupMode.COMPAT;
if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
return connection.setupNotification(gattCharacteristic, setupMode);
}
if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
return connection.setupIndication(gattCharacteristic, setupMode);
}
return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
}
})
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
})
.onBackpressureBuffer(1000) // <---- Here is my modification
.observeOn(Schedulers.trampoline()) // <---- an operator that does backpressure the above
.subscribe(new Observer<byte[]>() {
@Override
public void onCompleted() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
@Override
public void onError(Throwable e) {
errorConverter.toError(e).reject(promise);
transactions.removeSubscription(transactionId);
}
@Override
public void onNext(byte[] bytes) {
characteristic.logValue("Notification from", bytes);
WritableArray jsResult = Arguments.createArray();
jsResult.pushNull();
jsResult.pushMap(characteristic.toJSObject(bytes));
jsResult.pushString(transactionId);
sendEvent(Event.ReadEvent, jsResult);
}
});

相关内容

  • 没有找到相关文章

最新更新