为什么 Flowable.subscribe(Subscriber) 不返回一次性?



大多数Flowable.subscribe()重载都会返回一个Disposable,以便清理流。我习惯于做:

Disposable d = Flowable.just()
.map(...)
.subscribe(
n -> ...
t -> ...
() -> ...
);
// someone clicks "cancel" in another thread
d.dispose();

但是,使用.subscribe(Subscriber)时,不会返回Disposable。我想使用.subscribe(Subscriber),以便我可以传入TestSubscriber来验证行为。那么在这种情况下,我将如何处置流呢?

我在Javadoc中搜索了适当的Subscriber。有DisposableSubscriber看起来可以工作,但有两个问题:

  1. 类描述如下所示,这表明不能从流外部使用cancel()

使用 protected request(long( 请求更多项,使用cancel(( 取消 onNext 实现中的序列

  1. TestSubscriber 不会扩展 DisposableSubscriber。

您可以使用Flowable.subscribeWith(Subscriber)而不是subscribe,以便返回您的Subscriber,而不是void

在 RxJava 3.x 中,TestSubscriber不再实现Disposable。它确实实现了disposeisDisposed方法,如它扩展的BaseTestConsumer所定义。但是,这两种方法都已protected,因此您实际上不能直接使用它们。幸运的是,有TestSubscriber.cancel()/TestSubscriber.isCancelled(),它们是公开的,相当于dispose()/isDisposed(),因此您可以使用它们。

至于Flowable.subscribe不返回Disposable的原因,此更改是在RxJava 2中进行的,以遵守Reactive-Streams规范:

由于反应流规范,Publisher.subscribe返回void...为了解决这个问题,方法E subscribeWith(E subscriber)已添加到每个基反应类中,该类按原样返回其输入订阅者/观察者。

相关内容

  • 没有找到相关文章

最新更新