时间内产生可观察值的最惯用方法是什么?例如,假设我有一个从大数组创建的可观察量,并且我想每 2 秒生成一个值。interval
和selectMany
的结合是最好的方法吗?
对于您的特定示例,这个想法是将数组中的每个值映射到一个可观察量,该可观察量将在延迟后产生其结果,然后连接生成的可观察量流:
var delayedStream = Rx.Observable
.fromArray([1, 2, 3, 4, 5])
.map(function (value) { return Rx.Observable.return(value).delay(2000); })
.concatAll();
其他例子可能确实利用timer
或interval
。 这要看情况。
例如,如果你的数组真的很大,那么上面的内容会导致相当大的内存压力(因为它正在为一个非常大的N
创建N
可观察量)。 下面是一个使用 interval
懒惰地遍历数组的替代方法:
var delayedStream = Rx.Observable
.interval(2000)
.take(reallyBigArray.length) // end the observable after it pulses N times
.map(function (i) { return reallyBigArray[i]; });
这将每 2 秒从数组中生成下一个值,直到它迭代整个数组。
zip 会产生更好、更具可读性的代码,仍然只使用 3 个可观察量。
var items = ['A', 'B', 'C'];
Rx.Observable.zip(
Rx.Observable.fromArray(items),
Rx.Observable.timer(2000, 2000),
function(item, i) { return item;}
)
对于 RxJS v6 获取下一个延迟 2 秒。
示例 1. concatMap:
import {of} from 'rxjs';
import {concatMap, delay} from 'rxjs/operators';
of(1, 2, 3, 4, 5)
.pipe(
concatMap(x => of(x)
.pipe(
delay(2000))
)
)
.subscribe({
next(value) {
console.log(value);
}
});
示例 2. 映射 + 连接全部:
import {of} from 'rxjs';
import {concatAll, delay, map} from 'rxjs/operators';
of(1, 2, 3, 4, 5)
.pipe(
map(x => of(x)
.pipe(
delay(2000))
),
concatAll()
)
.subscribe({
next(value) {
console.log(value);
}
});
<</div>
div class="one_answers"> 虽然布兰登的回答得到了这个想法的要点,但这里有一个版本,它立即产生第一项,然后在随后的项之间放置时间。
var delay = Rx.Observable.empty().delay(2000);
var items = Rx.Observable.fromArray([1,2,3,4,5])
.map(function (x) {
return Rx.Observable.return(x).concat(delay); // put some time after the item
})
.concatAll();
针对较新的 RxJS 进行了更新:
var delay = Rx.Observable.empty().delay(2000);
var items = Rx.Observable.fromArray([1,2,3,4,5])
.concatMap(function (x) {
return Rx.Observable.of(x).concat(delay); // put some time after the item
});
对于 RxJS 5:
Rx.Observable.from([1, 2, 3, 4, 5])
.zip(Rx.Observable.timer(0, 2000), x => x)
.subscribe(x => console.log(x));
同意zip是一种干净的方法。下面是一个可重用的函数,用于为数组生成间隔流:
function yieldByInterval(items, time) {
return Rx.Observable.from(items).zip(
Rx.Observable.interval(time),
function(item, index) { return item; }
);
}
// test
yieldByInterval(['A', 'B', 'C'], 2000)
.subscribe(console.log.bind(console));
这建立在farincz的答案之上,但通过使用.zip
作为实例方法,时间略短。
另外,我使用了Rx.Observable.from()
Rx.Observable.fromArray()
因为它已被弃用。
由于没有提到这一点,我认为concatMap
与delay
相结合是相当可读的。
Rx.Observable.fromArray([1, 2, 3, 4, 5])
.concatMap(x => Rx.Observable.of(x).delay(1000));
见 https://codepen.io/jmendes/pen/EwaPzw
基于farincz和user3587412的zip解决方案,以下是它在RxJS v6中的工作方式
。const { zip, from, timer } = require("rxjs")
const { map } = require("rxjs/operators")
const input = [1, 2, 3, 4, 5]
const delay = 2000
zip(
from(input),
timer(0, delay)
).pipe(
map(([ delayedInput, _timer ]) => delayedInput) // throw away timer index
).subscribe(
console.log
)
//create a custom operator
const delayEach=(millis)=>(o)=>o.pipe(concatMap((x)=>of(x).pipe(delay(millis))))
of(1, 2, 3, 4, 5)
.pipe(delayEach(1000))
.subscribe(console.log);
RxJs 6 代码,它立即发出第一项并延迟其余项:
import { of, EMPTY, concat } from "rxjs";
import { concatMap, delay } from "rxjs/operators";
const delayed$ = EMPTY.pipe(delay(1000));
console.log("start");
of(1, 2, 3, 4)
.pipe(concatMap(v => concat(of(v), delayed$)))
.subscribe({
next: console.log
});
全堆栈闪电战示例
想法:
- 对于每个项目,我们创建一个可观察量(使用
concat
),它将立即输出该项目(of(v)
),然后在延迟后发出可观察EMPTY
- 由于我们使用
concatMap
所有发出的可观察量都将以正确的顺序发出
一个简单的单行:
const delayMs = 2000
from([1, 2, 3]).pipe(concatMap(x => of(x).pipe(delay(delayMs)))).subscribe(item => {
});