我希望以下代码会异步运行:
var range = Rx.Observable.range(0, 3000000);
range.subscribe(
function(x) {},
function(err) {},
function() {
console.log('Completed');
});
console.log('Hello World');
但事实并非如此。需要一段时间才能浏览大量数字,只有完成执行才能恢复执行,您可以在此处尝试代码。
我对何时期望RXJ同步或异步行为感到困惑。这取决于使用的方法吗?我以前的想法是,一旦我们进入可观察/观察者的土地,它中的一切都异步运行,类似于承诺的工作方式。
rxjs遵循与rx.net相同的规则。默认情况下,每个可观察的操作员使用完成其工作所需的最小异步性。在这种情况下,Range
可以同步贯穿数字,因此它可以(其文档告诉您它将使用Rx.Scheduler.currentThread
默认使用。
如果您想引入比操作所需的更多异步性,则需要告诉它使用其他调度程序。
要获得您期望的行为,您想使用Rx.Scheduler.timeout
。从本质上讲,这将导致它通过setTimeout
安排每次迭代。(实际上并不是那么简单,调度程序将使用浏览器中最快的方法来安排延期工作)。
var range = Rx.Observable.range(0, 3000000, Rx.Scheduler.timeout);
更新了JSFIDDLE
请注意,通过setTimeout
迭代300万个数字将几乎永远使用。因此,也许我们想以1,000的批量处理它们。因此,在这里,我们将利用Range
同步运行的默认行为,然后批量批处理并使用observeOn
通过我们的超时调度程序运行批处理:
var range = Rx.Observable
.range(0, 3000000)
.bufferWithCount(1000)
.observeOn(Rx.Scheduler.timeout) // run each buffer via setTimeout
.select(function (buffer, i) {
console.log("processing buffer", i);
return Rx.Observable.fromArray(buffer);
})
.concatAll(); // concat the buffers together
JSFIDDLE指出,一开始有一个延迟,而range
曲柄通过所有3,000,000个值,bufferWithCount
产生3,000个数组。对于真实的生产代码而言,这种东西是不寻常的,因为您的数据源不如Observable.range
。
fyi承诺在这方面没有什么不同。如果您在已经完成的承诺上调用then
,则then
功能可能会同步运行。所有的承诺和可观察到的事情都真正存在一个接口,您可以通过该界面提供可以提供在满足条件时可以保证运行的回调,无论是否已经满足或以后会满足条件。然后,RXJS提供了许多机制,如果您真的想要这种方式,则可以强迫某些东西不同步。和引入特定时间安排的方法。