RxJS自述中的示例似乎表明我们必须订阅源代码。换句话说:我们等待源发送事件。从这个意义上说,源似乎是基于推送的:源决定何时创建新项目。
然而,这与迭代器形成了对比,严格地说,只有在请求时,即在调用next()
时,才需要创建新项。这就是pull-based
行为,也称为懒惰生成。
例如,一个流可以返回所有素数的维基百科页面。这些项目只有在你要求的时候才会生成,因为提前生成所有项目是一项相当大的投资,而且可能只有两三个项目可以阅读。
RxJS是否也有这样的基于拉的行为,这样只有在您请求时才会生成新项目
关于背压的页面似乎表明这还不可能。
简短的回答是否定的。
RxJS是为反应式应用程序设计的,因此正如您已经提到的,如果您需要基于拉的语义,则应该使用Iterator
而不是Observable
。Observables
被设计为迭代器的基于推送的对应物,因此从算法上讲,它们确实占据了不同的空间。
显然,我不能说这种情况永远不会发生,因为这是社区会决定的。但据我所知,1)这种情况的语义不太好,2)这与对数据做出反应的想法背道而驰。
在这里可以找到一个非常好的概要。它适用于Rx.Net,但这些概念同样适用于RxJS。
您引用的页面中的受控可观察对象可以将推送可观察对象更改为拉送。
var controlled = source.controlled();
// this callback will only be invoked after controlled.request()
controlled.subscribe(n => {
log("controlled: " + n);
// do some work, then signal for next value
setTimeout(() => controlled.request(1), 2500);
});
controlled.request(1);
真正的同步迭代器是不可能的,因为它会在源不发出时阻塞。
在下面的代码段中,受控订阅者在发出信号时只获得一个项目,并且不会跳过任何值。
var output = document.getElementById("output");
var log = function(str) {
output.value += "n" + str;
output.scrollTop = output.scrollHeight;
};
var source = Rx.Observable.timer(0, 1000);
source.subscribe(n => log("source: " + n));
var controlled = source.controlled();
// this callback will only be invoked after controlled.request()
controlled.subscribe(n => {
log("controlled: " + n);
// do some work, then signal for next value
setTimeout(() => controlled.request(1), 2500);
});
controlled.request(1);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
<body>
<textarea id="output" style="width:150px; height: 150px"></textarea>
</body>
我参加聚会已经很晚了,但实际上将生成器与可观察器结合起来非常简单。您可以通过将生成器函数与可观察的源同步来从中提取值
const fib = fibonacci()
interval(500).pipe(
map(() => fib.next())
)
.subscribe(console.log)
供参考的生成器实现:
function* fibonacci() {
let v1 = 1
let v2 = 1
while (true) {
const res = v1
v1 = v2
v2 = v1 + res
yield res
}
}