我有一个rx.Observable
,它向onNext()
发送任务的进度。有时onNext()
的排放速度过快,导致Observer
无法跟上,造成背压。我想通过只缓冲Observable
的最新排放来处理背压。
-
Observable
发出1,Observer
收到1。 < - 尽管
Observer
仍然是处理strong> 1 ,Observable
发出 2 , 3 ,和 4 。 -
Observer
完成处理1,开始处理4(排放2和3被丢弃)。
这似乎是在Rx Observable中处理进度的常见情况,因为你通常只关心用最新的进度信息更新你的UI。然而,我一直没能想出如何做到这一点。
有人知道如何用RxJava实现这一点吗?
onBackPressureLatest
是你的朋友。:)http://reactivex.io/RxJava/javadoc/rx/Observable.html onBackpressureLatest ()
Observable.debounce
听起来就是你需要的。在下面的例子中,只有在每个200ms窗口内的可观测对象的最新发射才会被发送到观测者。
observable
.debounce(200, TimeUnit.MILLISECONDS)
.subscribe(observer);