在这种情况下,我有一个Observable of Observable,如下所示:
public outerFunction(
collections: someObject
): Observable<someOtherObject> {
const outerObservable$ = new Observable<Observable<someOtherObject>>(
(observer) => {
const collKeys = Object.keys(collections);
for (const id of collKeys) {
if (collections[id]) {
const innerObs$ = this.functionThatReturnsObs(
collections[id]
)
observer.next(innerObs$);
}
}
}
);
return outerObservable$.pipe(mergeAll(1));
}
functionThatReturnsObs
进行Http调用以检索一些数据,
我的问题是;如果其中一个调用失败,我不会从其他http调用中获得任何其他数据,就好像流被中断了一样。
我想做一些类似的事情:
this.outerFunction(collections).piep(
map((data) => /* do something with this data*/),
catchError((error) => /* in case of failure of one of the http calls do something */ ));
更新
,呼叫observer.next(innerObs$.pipe(() => empty());
似乎对我不起作用
但我设法通过从outerFunction
返回Observable<Observable<someOtherObject>>
,然后像这样使用mergeMap
来获得所需的行为:
public outerFunction(
collections: someObject
):Observable<Observable<someOtherObject>> {
const outerObservable$ = new Observable<Observable<someOtherObject>>(
(observer) => {
const collKeys = Object.keys(collections);
for (const id of collKeys) {
if (collections[id]) {
const innerObs$ = this.functionThatReturnsObs(
collections[id]
)
observer.next(innerObs$);
}
}
}
);
return outerObservable$;
}
然后:
this.outerFunction(collections).pipe(
mergeMap((innerObs$) => innerObs$.pipe(
map((data) => /* do something with this data*/),
catchError((err) => /* in case of err handle the faild HTTP call */)
)),
);
但我不知道为什么会这样,有人能解释为什么吗?
您已经有了使用catchError
的正确想法。为了防止外部可观测到的物质消亡,只需将其添加到内部可观测到物质中即可。例如
observer.next(innerObs$.pipe(catchError(() => EMPTY)));