未完成的可观察量的分叉联接替代方案?


constructor(
private route: ActivatedRoute,
private http: Http
){
// Observe parameter changes
let paramObs = route.paramMap;
// Fetch data once
let dataObs = http.get('...');
// Subscribe to both observables,
// use both resolved values at the same level
}

是否有类似于forkJoin的东西会在发出参数更改时触发?forkJoin仅在所有可观察量都完成后才有效。

我只需要避免回调地狱,欢迎任何符合的替代方案。

有几个选项:

  1. take(1)forkJoin()一起使用以完成每个源 可观察:

    forkJoin(o1$.take(1), o2$.take(1))
    
  2. 仅当所有源可观察量发出相同数量的项目时,才使用zip()take(1)发出:

    zip(o1$, o2$).take(1)
    
  3. 当任何源可观察量发出时,使用combineLatest()发出:

    combineLatest(o1$, o2$)
    

2019 年 1 月:针对 RxJS 6 进行了更新

一个小技巧,用于避免在任何一个可观察订阅失败时中断可观察订阅。


import { throwError, of, forkJoin } from "rxjs";
import { catchError, take } from "rxjs/operators";
//emits an error with specified value on subscription
const observables$ = [];
const observableThatWillComplete$ = of(1, 2, 3, 4, 5).pipe(take(1));
const observableThatWillFail$ = throwError(
"This is an error hence breaking the stream"
).pipe(catchError((error) => of(`Error Catched: ${error}`)));
observables$.push(observableThatWillComplete$, observableThatWillFail$);
forkJoin(observables$).subscribe(responses => {
console.log("Subscribed");
console.log(responses);
});

除了其他答案之外,请考虑使用将按顺序处理每个可观察量的Observable.concat()。下面是一个示例:

const getPostOne$ = Rx.Observable.timer(3000).mapTo({id: 1});
const getPostTwo$ = Rx.Observable.timer(1000).mapTo({id: 2});
Rx.Observable.concat(getPostOne$, getPostTwo$).subscribe(res => console.log(res));

一篇好文章概述了您应该知道的 6 个运算符。

最新更新