RxJava Observable.fromEmitter奇怪的反压行为



我一直在使用Observable.fromEmitter()作为Observable.create()的一个极好的替代品。我最近遇到了一些奇怪的行为,我不太明白为什么会这样。如果有人懂反压和调度器的知识,请帮我看一下。

public final class EmitterTest {
  public static void main(String[] args) {
    Observable<Integer> obs = Observable.fromEmitter(emitter -> {
      for (int i = 1; i < 1000; i++) {
        if (i % 5 == 0) {
          sleep(300L);
        }
        emitter.onNext(i);
      }
      emitter.onCompleted();
    }, Emitter.BackpressureMode.LATEST);
    obs.subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.computation())
        .subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"
    sleep(10000L);
  }
  private static void sleep(Long duration) {
    try {
      Thread.sleep(duration);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

这个应用程序的输出是

Received 1
Received 2
...
Received 128

然后它仍然停留在128(大概是因为这是RxJava的默认缓冲区大小)。

如果我将fromEmitter()中指定的模式更改为BackpressureMode.NONE,则代码按预期工作。如果我删除对observeOn()的调用,它也可以按预期工作。有人能解释一下为什么会这样吗?

这是同一池死锁的情况。subscribeOn将下游的request调度到它正在使用的同一个线程上,但如果该线程忙于睡眠/发射循环,则请求永远不会交付给fromEmitter,因此一段时间后,LATEST开始删除元素,直到最后一个值(999)被交付,如果主源等待的时间足够长。(这与我们删除的onBackpressureBlock的情况类似。)

如果subscribeOn不做这个请求调度,这个例子将正常工作。

我已经打开了一个问题来找出解决方案。

目前的解决方法是使用observeOn更大的缓冲区大小(有过载)或使用fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

这并不奇怪,这是意料之中的

让我们跟踪调用。首先:

Observable.subscribe(Subscriber<? super T> subscriber)

Observable.subscribe(Subscriber<? super T> subscriber, Observable<T> observable)

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);

等等。查看:

的构造函数

OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize):

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}

如果您没有指定缓冲区,默认为RxRingBuffer.SIZE,其大小取决于平台。

这就是为什么当你调用没有缓冲区大小的observeOn运算符时,默认值是128 (Android上是16)。

这个问题的解决方案非常简单:只需使用另一个observeOn操作符并声明缓冲区大小。但是,如果将缓冲区大小声明为1000(与来自发射器的元素数量相同),程序仍然会在不发出所有值(大约170)的情况下结束。为什么?因为程序结束了。主线程在10000秒后结束,你的计算在另一个线程(Schedulers.computation())中完成。解决方法是什么?使用CountdownLatch。注意千万不要在生产环境中使用它,它只对测试有用。

相关内容

  • 没有找到相关文章

最新更新