RxJava: onBackpressureBlock()行为奇怪



我正在使用RxJava(确切地说是RxKotlin)。这里我有以下Observable s:

fun metronome(ms: Int) = observable<Int> {
    var i = 0;
    while (true) {
        if (ms > 0) {
            Thread.sleep(ms.toLong())
        }
        if (it.isUnsubscribed()) {
            break
        }
        it.onNext(++i)
    }
}

我想有几个合并并并发运行。它们忽略反压,因此必须对它们应用反压操作符。

创建

val cores = Runtime.getRuntime().availableProcessors()
val threads = Executors.newFixedThreadPool(cores)
val scheduler = Schedulers.from(threads)

然后合并metronome s

val o = Observable.merge(listOf(metronome(0),
                                metronome(1000).map { "---------" })
                         .map { it.onBackpressureBlock().subscribeOn(scheduler) })
                  .take(5000, TimeUnit.MILLISECONDS)

第一个应该不断地发射物品。如果我在运行的最后3秒这样做,我会得到以下输出:

...
[RxComputationThreadPool-5]: 369255
[RxComputationThreadPool-5]: 369256
[RxComputationThreadPool-5]: 369257
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------

似乎在同一个线程上订阅了Observable s,并且第一个可观察对象被阻塞了3+秒。

但是当我交换onBackpressureBlock()subscribeOn(scheduler)调用时,输出变成了我所期望的,输出在整个执行过程中被合并。

对我来说,调用顺序在RxJava中很重要,但我不太明白在这种特殊情况下会发生什么。

那么,当 subscribeOn之前应用onBackpressureBlock运算符时会发生什么,如果之后又会发生什么呢?

onBackpressureBlock操作符是一个失败的实验;它需要注意应用的地方。例如,subscribeOn().onBackpressureBlock()可以工作,但不能反过来。

RxJava有一个叫做interval的非阻塞周期计时器,所以你不需要自己滚动。

相关内容

  • 没有找到相关文章

最新更新