我使用 PublishProcessor.offer(( 从上游发出。我读到(*(,如果订阅者没有准备好接收下一个事件,它可能会返回 false,因为 PublishProcessor 不协调背压。我想知道它是如何工作的。PublishProcessor 如何知道订阅者尚未准备就绪?
我读到上游可能知道下游通过无功拉动处理排放的潜力:
someObservable.subscribe(new Subscriber<t>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onCompleted() {
// gracefully handle sequence-complete
}
@Override
public void onError(Throwable e) {
// gracefully handle error
}
@Override
public void onNext(t n) {
// do something with the emitted item "n"
// request another item:
request(1);
}
});
https://github.com/ReactiveX/RxJava/wiki/Backpressure
但我的订阅看起来像这样:
publishProcessor.subscribe(new Consumer<T>() {
@Override
public void accept(T t) throws Exception {
// do IO
}
}, Log::submitCrash);
它是在 accept(( 完成后内部调用 request(1(; 还是不执行反应式拉取?我试图阅读代码,但似乎并非如此。此使用者将传递给 LambdaSubscriber。LambdaSubscriber.onNext(( 不会调用 request(n(。
这个订阅方法的javadoc:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
说:运算符以无限的方式使用源{@code发布者}(即,没有 对其施加背压(。
所以它没有提到反应性拉力。以无限制方式使用源是否意味着将无限背压缓冲区应用于此订阅或根本不应用背压缓冲区?
是否有其他机制可以让 PublishProcessor 知道它的订阅者是否完成了对发出值的消耗?
(*(
PublishProcessor 既是 Flowable Processor,也是 FlowableProcessor, 但是,它不能协调不同之间的背压 订阅服务器以及上游源和订阅服务器之间。如果 上游项目通过onNext(对象(接收,如果订阅者不是 准备接收项目,该订户通过 缺少背压异常。要避免这种情况,请使用 offer(对象( 如果返回 false,请稍后重试。
http://reactivex.io/RxJava/javadoc/io/reactivex/processors/PublishProcessor.html
每个Subscriber
都会收到一个特殊的Subscription
,该跟踪消费者发行的request
金额,如果PublishProcessor
可以在Subscriber
上调用onNext
,则减少此金额。
如果此跟踪金额为零并在offer
内检查,则认为消费者尚未准备好。
如果您同步消费PublishProcessor
并从onNext
内发出request
,则跟踪金额将始终大于零,因此offer
可以通过。
但是,如果您使用异步,例如,通过应用observeOn
,处理器和使用者之间现在有一个有界缓冲区,该缓冲区可能会填满,并且跟踪的请求量可能最终为零,从而阻止offer
发出更多信号。
无限消费意味着消费者发出request(Long.MAX_VALUE)
,这被解释为消费者准备接收任意数量的物品。PublishProcessor
本身会这样做,以防您将其订阅到另一个Publisher
。
一般来说,无界并不意味着无限缓冲,因为特定的使用者可能会在onNext
的调用线程上同步丢弃、批处理、采样或处理项目,因此不会发生溢出。