如果内部Observable在Observable的Observable流中失败,如何保持接收值



在这种情况下,我有一个Observable of Observable,如下所示:

public outerFunction(
collections: someObject
): Observable<someOtherObject> {
const outerObservable$ = new Observable<Observable<someOtherObject>>(
(observer) => {
const collKeys = Object.keys(collections);
for (const id of collKeys) {
if (collections[id]) {
const innerObs$ = this.functionThatReturnsObs(
collections[id]
)
observer.next(innerObs$);
}
}
}
);
return outerObservable$.pipe(mergeAll(1));
}

functionThatReturnsObs进行Http调用以检索一些数据,

我的问题是;如果其中一个调用失败,我不会从其他http调用中获得任何其他数据,就好像流被中断了一样。

我想做一些类似的事情:

this.outerFunction(collections).piep(
map((data) => /* do something with this data*/),
catchError((error) => /* in case of failure of one of the http calls do something */ ));

更新

,呼叫observer.next(innerObs$.pipe(() => empty());似乎对我不起作用

但我设法通过从outerFunction返回Observable<Observable<someOtherObject>>,然后像这样使用mergeMap来获得所需的行为:

public outerFunction(
collections: someObject
):Observable<Observable<someOtherObject>> {
const outerObservable$ = new Observable<Observable<someOtherObject>>(
(observer) => {
const collKeys = Object.keys(collections);
for (const id of collKeys) {
if (collections[id]) {
const innerObs$ = this.functionThatReturnsObs(
collections[id]
)
observer.next(innerObs$);
}
}
}
);
return outerObservable$;
}

然后:

this.outerFunction(collections).pipe(
mergeMap((innerObs$) => innerObs$.pipe(
map((data) => /* do something with this data*/),
catchError((err) => /* in case of err handle the faild HTTP call */)
)),
);

但我不知道为什么会这样,有人能解释为什么吗?

您已经有了使用catchError的正确想法。为了防止外部可观测到的物质消亡,只需将其添加到内部可观测到物质中即可。例如

observer.next(innerObs$.pipe(catchError(() => EMPTY)));

最新更新