Rxjava Scheduler.trampoline versus concatmap



似乎基于文档,Scheduler.trampoline确保元素发出先进先出(即按顺序(。 似乎Concat Map 的重点是确保所有内容都正确排列然后发出。所以我想知道应用订阅On./.observeOn(Scheduler.trampoline(((然后执行concatmap运算符而不是常规映射操作是否有所有意义。

是的,有一点。举个例子:

Observable.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.trampoline())
.flatMap(
a -> {
if (a < 3) {
return Observable.just(a).delay(3, TimeUnit.SECONDS);
} else {
return Observable.just(a);
}
})
.doOnNext(
a -> System.out.println("Element: " + a + ", on: " + Thread.currentThread().getName()))
.subscribe();

下面是输出:

Element: 3, on: main
Element: 4, on: main
Element: 5, on: main
Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2

这里发生的事情是 1 和 2 依次到达flatMap运算符。但是现在,这些元素的内部流延迟了 3 秒。请注意,flatMap急切地subscribes到内部流。也就是说,它不会等待一个流完成(带有onComplete(,然后再subscribing到下一个内部流(这就是concatMap所做的(。

因此,1 和 2 的内部流延迟了 3 秒。可以说这是一个需要一点时间的外部 I/O 调用。同时,接下来的 3 个元素 (3,4,5( 进入flatMap,它们的流立即完成。这就是您在输出中看到维护的序列的原因。

然后 3 秒过去,元素 1 和 2 被发射。请注意,不能保证 1 会先于 2。

现在将flatMap替换为concatMap,您将看到该序列保持不变:

Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2
Element: 3, on: RxComputationScheduler-2
Element: 4, on: RxComputationScheduler-2
Element: 5, on: RxComputationScheduler-2

为什么?因为这就是concatMap的工作方式。元素 1 来了,用于 I/O 调用。与其内流对应的内流发出onComplete需要 3 秒。与其余元素对应的内部流不会由concatMap订阅,直到第一个流发出 onComplete。一旦它这样做,下一个流(Observable.just(2).delay(3, TimeUnit.SECONDS)(被subscribed到,依此类推。因此,您可以看到订单是如何维持的。

关于这两个运算符,您需要记住的是:当元素到达时,flatMap急切地subscribes内部流。另一方面,concatMap等待一个流完成,然后再subscribes下一个流。这就是为什么您无法与concatMap进行并行调用的原因。

不是真的。trampoline实质上是在以 FIFO 顺序调用其Worker.schedule方法的线程之一上执行工作。

Observable.subscribeOn(Schedulers.trampoline())的情况下,它将是线程订阅,因此应用它没有实际效果。

Observable.observeOn(Schedulers.trampoline())的情况下,它将是线程信号项目,因此那里也没有实际效果。

concatMap在向上游项发出信号的线程或内部Observable完成的线程上执行映射器函数。运营商基本上已经有一个内置的蹦床,因此上游项目和下游完成不会重叠。在 3.x 中,将有一个过载占用SchedulerSchedulers.trampoline()也不会产生实际效果。

Schedulers.trampoline()的最佳用例是在不需要异步的单元测试中。因此,您可以参数化subscribeOn/observeOn用法,或者使用调度程序钩子并替换标准调度程序:

RxJavaPlugins.setComputationSchedulerHandler(s -> Schedulers.trampoline());
RxJavaPlugins.setIoSchedulerHandler(s -> Schedulers.trampoline());
RxJavaPlugins.setNewThreadSchedulerHandler(s -> Schedulers.trampoline());

然后一旦你完成,

RxJavaPlugins.reset();

相关内容

  • 没有找到相关文章

最新更新