RXJS中的同步性



我希望以下代码会异步运行:

var range = Rx.Observable.range(0, 3000000);
range.subscribe(
  function(x) {},
  function(err) {},
  function() {
    console.log('Completed');
});
console.log('Hello World');

但事实并非如此。需要一段时间才能浏览大量数字,只有完成执行才能恢复执行,您可以在此处尝试代码。

我对何时期望RXJ同步或异步行为感到困惑。这取决于使用的方法吗?我以前的想法是,一旦我们进入可观察/观察者的土地,它中的一切都异步运行,类似于承诺的工作方式。

rxjs遵循与rx.net相同的规则。默认情况下,每个可观察的操作员使用完成其工作所需的最小异步性。在这种情况下,Range可以同步贯穿数字,因此它可以(其文档告诉您它将使用Rx.Scheduler.currentThread默认使用。

如果您想引入比操作所需的更多异步性,则需要告诉它使用其他调度程序。

要获得您期望的行为,您想使用Rx.Scheduler.timeout。从本质上讲,这将导致它通过setTimeout安排每次迭代。(实际上并不是那么简单,调度程序将使用浏览器中最快的方法来安排延期工作)。

var range = Rx.Observable.range(0, 3000000, Rx.Scheduler.timeout);

更新了JSFIDDLE

请注意,通过setTimeout迭代300万个数字将几乎永远使用。因此,也许我们想以1,000的批量处理它们。因此,在这里,我们将利用Range同步运行的默认行为,然后批量批处理并使用observeOn通过我们的超时调度程序运行批处理:

var range = Rx.Observable
    .range(0, 3000000)
    .bufferWithCount(1000)
    .observeOn(Rx.Scheduler.timeout) // run each buffer via setTimeout
    .select(function (buffer, i) {
       console.log("processing buffer", i);
       return Rx.Observable.fromArray(buffer);
     })
    .concatAll(); // concat the buffers together

JSFIDDLE指出,一开始有一个延迟,而range曲柄通过所有3,000,000个值,bufferWithCount产生3,000个数组。对于真实的生产代码而言,这种东西是不寻常的,因为您的数据源不如Observable.range

fyi承诺在这方面没有什么不同。如果您在已经完成的承诺上调用then,则then功能可能会同步运行。所有的承诺和可观察到的事情都真正存在一个接口,您可以通过该界面提供可以提供在满足条件时可以保证运行的回调,无论是否已经满足或以后会满足条件。然后,RXJS提供了许多机制,如果您真的想要这种方式,则可以强迫某些东西不同步。和引入特定时间安排的方法。

相关内容

  • 没有找到相关文章