仓促的分叉连接替代 rxjs 用于可观察链接?



我有 5 个不同的 API 调用要做,它们现在都链接在 forkJoin 中。 我的新要求是订阅应该在任何新的可观察解决方案时触发。

rxjs 中是否有任何运算符或任何其他技巧,我可以在其中保持链接,但是每次任何可观察到的解决时都应该触发?

forkJoin(
this.myService.api1(),
this.myService.api2(),
this.myService.api3(),
this.myService.api4(),
this.myService.api5()
)
.subscribe(
([r1,r2,r3,r4,r5]) => { ... do something })

您可以使用combineLatest从每个可观察的源发出最新值。在每个可观察源至少发出一次之前,它不会发出,因此您可以使用startWith来提供起始值:

combineLatest(
this.myService.api1().pipe(startWith(null)),
this.myService.api2().pipe(startWith(null)),
this.myService.api3().pipe(startWith(null)),
this.myService.api4().pipe(startWith(null)),
this.myService.api5().pipe(startWith(null))
)
.subscribe(
([r1,r2,r3,r4,r5]) => { ... do something })

初始输出将为[null, null, null, null, null]。当每个可观察对象发出时,它将替换数组中相应的null值。

如果要忽略初始发射,可以使用skip(1)

const sourceOne = of('Hello').pipe(delay(1000));
const sourceTwo = of('World!').pipe(delay(2000));
const sourceThree = of('Goodbye').pipe(delay(3000));
const sourceFour = of('World!').pipe(delay(4000));
//wait until all observables have emitted a value then emit all as an array
const example = combineLatest(
sourceOne.pipe(startWith(null)),
sourceTwo.pipe(startWith(null)),
sourceThree.pipe(startWith(null)),
sourceFour.pipe(startWith(null))
)
.pipe(skip(1));
//output:
//["Hello", null, null, null]
//["Hello", "World!", null null]
//["Hello", "World!", "Goodbye", null]
//["Hello", "World!", "Goodbye", "World!"]
//Complete
const subscribe = example.subscribe(val => console.log(val), null, () => console.log('Complete'));

这里有一个StackBlitz来尝试一下。

您可以使用merge像forkJoin一样同时执行可观察量,但立即发出它们的值。要跟踪顺序,请使用map将可观察量的索引添加到其输出中。使用scan跟踪以前的值,在数组中的正确位置插入当前值并发出累积数据。

export function forkJoinEarly(...sources: Observable<any>[]): Observable<any[]> {
return merge(...sources.map((obs, index) => obs.pipe(
// optional: only emit last value like forkJoin
last(), 
// add the index of the observable to the output
map(value => ({ index, value })) 
))).pipe(
// use scan to keep track of previous values and insert current values
scan((acc, curr) => (acc[curr.index] = curr.value, acc), Array(sources.length).fill(undefined))
);
}

https://stackblitz.com/edit/rxjs-gwch8m

最新更新