大多数Flowable.subscribe()
重载都会返回一个Disposable
,以便清理流。我习惯于做:
Disposable d = Flowable.just()
.map(...)
.subscribe(
n -> ...
t -> ...
() -> ...
);
// someone clicks "cancel" in another thread
d.dispose();
但是,使用.subscribe(Subscriber)
时,不会返回Disposable
。我想使用.subscribe(Subscriber)
,以便我可以传入TestSubscriber
来验证行为。那么在这种情况下,我将如何处置流呢?
我在Javadoc中搜索了适当的Subscriber
。有DisposableSubscriber
看起来可以工作,但有两个问题:
- 类描述如下所示,这表明不能从流外部使用
cancel()
:
使用 protected request(long( 请求更多项,使用cancel(( 取消 onNext 实现中的序列。
- TestSubscriber 不会扩展 DisposableSubscriber。
您可以使用Flowable.subscribeWith(Subscriber)
而不是subscribe
,以便返回您的Subscriber
,而不是void
。
在 RxJava 3.x 中,TestSubscriber
不再实现Disposable
。它确实实现了dispose
和isDisposed
方法,如它扩展的BaseTestConsumer
所定义。但是,这两种方法都已protected
,因此您实际上不能直接使用它们。幸运的是,有TestSubscriber.cancel()
/TestSubscriber.isCancelled()
,它们是公开的,相当于dispose()
/isDisposed()
,因此您可以使用它们。
至于Flowable.subscribe
不返回Disposable
的原因,此更改是在RxJava 2中进行的,以遵守Reactive-Streams规范:
由于反应流规范,
Publisher.subscribe
返回void
...为了解决这个问题,方法E subscribeWith(E subscriber)
已添加到每个基反应类中,该类按原样返回其输入订阅者/观察者。