如何正确地对PublishSubject应用背压



我有一个PublishSubject,它会发出位置更新(LatLng(:

val location = PublishSubject.create<LatLng>()

现在,我想对这个位置做点什么,这可能需要一些时间,应该按顺序进行:

location
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribe {
// ... CPU-heavy operation
}

只有在subscribe中的每个操作完成后,才能运行下一个操作。此外,每次更新都会使前一次过时,所以我只对最新的值感兴趣。因此,订阅者在每次更新时应该只接收当前最新的值。因此,我想到了应用背压:

location
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.toFlowable(BackpressureStrategy.LATEST)
.subscribeWith(object : DisposableSubscriber<LatLng>() {
override fun onStart() {
request(1)
}
override fun onError(t: Throwable) {
// ...
}
override fun onComplete() {
// ...
}
override fun onNext(t: LatLng) {
// ... CPU-heavy task
request(1)
}
})

不幸的是,这不起作用,因为每个发出的LatLng都会被传递给订阅者,并且不会跳过任何值

问题是observeOn总是可以缓冲至少一个元素。您可以通过delay实现所需的效果,尽管:

location
.toFlowable(BackpressureStrategy.LATEST)
.delay(0, TimeUnit.SECONDS, Schedulers.computation())
.subscribeWith(object : DisposableSubscriber<LatLng>() {
override fun onStart() {
request(1)
}
override fun onError(t: Throwable) {
// ...
}
override fun onComplete() {
// ...
}
override fun onNext(t: LatLng) {
// ... CPU-heavy task
request(1)
}
})

附加备注:

  1. subscribeOnSubject没有实际效果,可以忽略
  2. toFlowable最好应用在靠近源的位置,以避免delay被不必要的项目淹没
  3. 如果你不介意第三方运营商,有一个observeOnLatest运营商可以做同样的事情

最新更新