我正在使用RxJava(确切地说是RxKotlin)。这里我有以下Observable
s:
fun metronome(ms: Int) = observable<Int> {
var i = 0;
while (true) {
if (ms > 0) {
Thread.sleep(ms.toLong())
}
if (it.isUnsubscribed()) {
break
}
it.onNext(++i)
}
}
我想有几个合并并并发运行。它们忽略反压,因此必须对它们应用反压操作符。
创建
val cores = Runtime.getRuntime().availableProcessors()
val threads = Executors.newFixedThreadPool(cores)
val scheduler = Schedulers.from(threads)
然后合并metronome
s
val o = Observable.merge(listOf(metronome(0),
metronome(1000).map { "---------" })
.map { it.onBackpressureBlock().subscribeOn(scheduler) })
.take(5000, TimeUnit.MILLISECONDS)
第一个应该不断地发射物品。如果我在运行的最后3秒这样做,我会得到以下输出:
...
[RxComputationThreadPool-5]: 369255
[RxComputationThreadPool-5]: 369256
[RxComputationThreadPool-5]: 369257
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
似乎在同一个线程上订阅了Observable
s,并且第一个可观察对象被阻塞了3+秒。
但是当我交换onBackpressureBlock()
和subscribeOn(scheduler)
调用时,输出变成了我所期望的,输出在整个执行过程中被合并。
对我来说,调用顺序在RxJava中很重要,但我不太明白在这种特殊情况下会发生什么。
那么,当在subscribeOn
之前应用onBackpressureBlock
运算符时会发生什么,如果在之后又会发生什么呢? onBackpressureBlock
操作符是一个失败的实验;它需要注意应用的地方。例如,subscribeOn().onBackpressureBlock()
可以工作,但不能反过来。
RxJava有一个叫做interval
的非阻塞周期计时器,所以你不需要自己滚动。