我一直在使用Observable.fromEmitter()
作为Observable.create()
的一个极好的替代品。我最近遇到了一些奇怪的行为,我不太明白为什么会这样。如果有人懂反压和调度器的知识,请帮我看一下。
public final class EmitterTest {
public static void main(String[] args) {
Observable<Integer> obs = Observable.fromEmitter(emitter -> {
for (int i = 1; i < 1000; i++) {
if (i % 5 == 0) {
sleep(300L);
}
emitter.onNext(i);
}
emitter.onCompleted();
}, Emitter.BackpressureMode.LATEST);
obs.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"
sleep(10000L);
}
private static void sleep(Long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
这个应用程序的输出是
Received 1
Received 2
...
Received 128
然后它仍然停留在128(大概是因为这是RxJava的默认缓冲区大小)。
如果我将fromEmitter()
中指定的模式更改为BackpressureMode.NONE
,则代码按预期工作。如果我删除对observeOn()
的调用,它也可以按预期工作。有人能解释一下为什么会这样吗?
这是同一池死锁的情况。subscribeOn
将下游的request
调度到它正在使用的同一个线程上,但如果该线程忙于睡眠/发射循环,则请求永远不会交付给fromEmitter
,因此一段时间后,LATEST
开始删除元素,直到最后一个值(999)被交付,如果主源等待的时间足够长。(这与我们删除的onBackpressureBlock
的情况类似。)
如果subscribeOn
不做这个请求调度,这个例子将正常工作。
我已经打开了一个问题来找出解决方案。
目前的解决方法是使用observeOn
更大的缓冲区大小(有过载)或使用fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()
这并不奇怪,这是意料之中的
让我们跟踪调用。首先:
Observable.subscribe(Subscriber<? super T> subscriber)
Observable.subscribe(Subscriber<? super T> subscriber, Observable<T> observable)
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
等等。查看:
的构造函数 OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize)
:
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}
如果您没有指定缓冲区,默认为RxRingBuffer.SIZE
,其大小取决于平台。
这就是为什么当你调用没有缓冲区大小的observeOn
运算符时,默认值是128 (Android上是16)。
这个问题的解决方案非常简单:只需使用另一个observeOn
操作符并声明缓冲区大小。但是,如果将缓冲区大小声明为1000(与来自发射器的元素数量相同),程序仍然会在不发出所有值(大约170)的情况下结束。为什么?因为程序结束了。主线程在10000秒后结束,你的计算在另一个线程(Schedulers.computation()
)中完成。解决方法是什么?使用CountdownLatch
。注意千万不要在生产环境中使用它,它只对测试有用。