如何在 Rxjava2 中获取有关背压的实际最新事件?Flowable.onBackpressureLatest() 未



当生产者生成事件的速度快于客户消费的速度时。

我认为使用带有onBackpressureLatest()的Flowable,我可以发出最新的事件。

但事实证明,有一个大小为 128 的默认缓冲区。我得到的是之前缓冲的日期事件。

那么如何获得实际的最新事件呢?

下面是示例代码:

Flowable.interval(40, TimeUnit.MILLISECONDS)
.doOnNext{
println("doOnNext $it")
}
.onBackpressureLatest()
.observeOn(Schedulers.single())
.subscribe {
println("subscribe $it")
Thread.sleep(100)
}

我所期望的:

doOnNext    0
subscribe   0
doOnNext    1
doOnNext    2
subscribe   2
doOnNext    3
doOnNext    4
doOnNext    5
subscribe   5
doOnNext    6
doOnNext    7
subscribe   7
doOnNext    8
doOnNext    9
doOnNext    10
subscribe   10
...

我得到了什么:

doOnNext    0
subscribe   0
doOnNext    1
doOnNext    2
subscribe   1
doOnNext    3
doOnNext    4
doOnNext    5
subscribe   2
doOnNext    6
doOnNext    7
subscribe   3
doOnNext    8
doOnNext    9
doOnNext    10
subscribe   4
...
doOnNext    325
subscribe   127
doOnNext    326
doOnNext    327
doOnNext    328
subscribe   246
...

您的问题实际上在于observeOn,默认情况下最多请求 128 个项目并将此请求传递到backpressureLatest,因此它的行为不符合您的预期。

可以使用.observeOn(Scheduler,boolean,int)指定缓冲区大小,该大小应修复您看到的行为。

相关内容

  • 没有找到相关文章

最新更新