当我们期望出现错误时,mergeMap如何工作?



我有一个非常具体的用例,我们有200张图片,我们想一次加载10张。在rxjs中,具有并发性的MergeMap是这种有200个HTTP请求并一次执行10个请求的用例的完美方法。但是图像可能会失败,这在我们的系统中没有问题,我们可能会尝试获取不再存在的图像并抛出HTTP错误,问题是:

在MergeMap中,如果一个可观察对象失败,我只得到错误,所有其他请求被取消或阻止。我在每一级执行中都添加了catchError,以测试我是否可以捕获它,并在不成功的情况下返回一个无错误的可观察对象。

是否有一种方法可以使用MergeMap并发,我们期望错误并执行所有错误?

我代码:

const limitedParallelObservableExecution = <T>(
listOfItems: T[],
observableMethod: (item: T) => Observable<unknown>,
maxConcurrency: number = maxParallelUploads
): Observable<unknown> => {
if (listOfItems && listOfItems.length > 0) {
const observableListOfItems: Observable<T> = from(listOfItems);
return observableListOfItems.pipe(mergeMap(observableMethod, maxConcurrency), toArray());
} else {
return of({});
}
};

我在这里执行:

limitedParallelObservableExecution<T>(queueImages, (item) =>
methodReturnsObservable().pipe(
catchError((error) => {
// Catch error and returns Observable
return of([]);
})
)
).subscribe((value) => console.log('value: ', value));

订阅中的最后一个console.log永远不会被执行。

编辑:确实,似乎' methodreturnsoobservable()'的内容是罪魁祸首:

downloadVehicleImage(imageId: string, width: number): Observable<any> {
const params = objectToActualHttpParams({
width,
noSpinner: true
});
return this.http.get(`${environment.WS_ENDPOINT_URI}/vehicles/image/${imageId}`, { responseType: 'blob', params }).pipe(
mergeMap((blob) => {
if (blob.size > 0) {
const image = new Subject();
const reader = new FileReader();
reader.readAsDataURL(blob);
reader.onloadend = () => image.next(this.domSanitizer.bypassSecurityTrustResourceUrl(reader.result as string));
return image.asObservable();
}
return of();
})
);
}

从http中移除管道。get解决了我的问题,所以我会调试这个,看看有什么问题,谢谢!

你写的东西对我来说很有用。我猜你的错误在别处。methodreturnsoobservable必须完成,否则会出错。如果它们中的任何一个保持打开状态,toArray()将永远不会发出。

检查的一种方法是使用超时

function limitedParallelObservableExecution<T, R>(
listOfItems: T[],
observableMethod: (item: T) => Observable<R>,
maxConcurrency: number = maxParallelUploads
): Observable<R[]> {
return from(listOfItems || []).pipe(
mergeMap(
item => observableMethod(v).pipe(
timeout({ each: 5000 }),
catchError(_ => EMPTY)
),
maxConcurrency
),
toArray()
);
}

最新更新