Angular处理多个依赖订阅



有什么帮助吗?

let notificationsMessages = []
countries.forEach((country: any) => {
this.isActiveCountry(country.isActive).subscribe((data) => { // // CALL #1 TO API
country.serverId = data.serverId;
this.uploadPhotoToApi(country.fileSource).subscribe((response) => { // CALL #2 TO API
// server return the uploaded file ID
country.serverFileID = response.serverFileId;
this.sendCountryToApi(country).subscribe((response) => { // CALL #3 TO API
this.countriesTable.delete(country.id).then(() => {
// Delete the uploaded country from local database
// if is the last country EMIT EVENT
}, (error) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // Error to delete country from indexedDB 
});
}, (error) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // // Error to upload country to API
});
}, (errorCode) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // Error on sending file to API
});
}, (error) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // // Error on country identification
});
});

当所有country列表都被处理时,我如何发出一个事件?我需要知道有多少国家上传成功了,有多少没有。

例如,如果我有一个50个国家的列表,当最后一个被处理时,我想要发出一个带有两个数组的事件…像这样:成功:[countryId1, countryId2…]错误:['国家Id 2上传失败','国家Id 10上传文件失败']

所有这3个调用都是依赖的,必须按照这个顺序执行…我无法改变这种流动。我应该在CALL #3成功和所有错误函数上发出事件吗?谢谢!

这里有一种方法。这可能是过度的,因为它为您提供了许多对错误处理的粒度控制,但基本上总是以相同的方式处理错误。

即便如此,这可能比最直接的解决方案更容易扩展。

:

interface TaggedCountry{
success: boolean,
country: any,
error?: any
}
class ArbiratryClassName {
processCountry(country: any): Observable<TaggedCountry>{
return this.isActiveCountry(country.isActive).pipe(
// country now has serverId set
map(({serverId}) => ({...country, serverId})),
catchError(error => throwError(() => ({
success: false,
country,
error
}) as TaggedCountry)),
mergeMap((resultCountry: any) => this.uploadPhotoToApi(resultCountry.fileSource).pipe(
// country now has serverFileId set
map(({serverFileId}) => ({...resultCountry, serverFileId})),
catchError(error => throwError(() => ({
success: false,
country: resultCountry,
error
}) as TaggedCountry))
)),
mergeMap((resultCountry: any) => this.sendCountryToApi(resultCountry).pipe(
// Ignore response from sendCountryToApi
mapTo(resultCountry),
catchError(error => throwError(() => ({
success: false,
country: resultCountry,
error
}) as TaggedCountry))
)),
mergeMap((resultCountry: any) => from(this.countriesTable.delete(resultCountry.id)).pipe(
// Ignore response from countriesTable.delete
mapTo(resultCountry),
catchError(error => throwError(() => ({
success: false,
country: resultCountry,
error
}) as TaggedCountry))
)),
map((resultCountry: any) => ({
success: true,
country: resultCountry
}) as TaggedCountry),
// Convert errors into regular emissions
catchError((tagged:TaggedCountry) => of(tagged))
);
}
processCountries(countries: any[]): Observable<{success: TaggedCountry[], errors: TaggedCountry[]}>{
return forkJoin(countries.map(c => this.processCountry(c))).pipe(
map((tagged: TaggedCountry[]) => ({
success: tagged.filter(tag => tag.success),
errors: tagged.filter(tag => !tag.success)
}))
)
}
doSomethingWith(countries: any[]): void {
this.processCountries(countries).subscribe({
next: countries => console.log("All countries processed. Result: ", countries),
complete: () => console.log("There's only one emission, so this should get called immediately after .next() was called"),
error: err => console.log("This is a surprise, is there an error we didn't catch earlier? Error: ", err)
})
}
}

如果看到同样的事情以不同的方式完成是有帮助的,这里是processCountry的一个更短的实现

processCountry(country: any): Observable<TaggedCountry>{
return this.isActiveCountry(country.isActive).pipe(
tap((res:any) => country.serverId = res.serverId),
switchMap(_ => this.uploadPhotoToApi(country.fileSource)),
tap((res:any) => country.serverFileId = res.serverFileId),
switchMap(_ => this.sendCountryToApi(country)),
switchMap(_ => this.countriesTable.delete(country.id)),
// Tag our result as a success
map(_ => ({
success: true,
country
}) as TaggedCountry),
// Tag our errors and convert errors into regular emissions
catchError(error => of(({
success: false,
country,
error
}) as TaggedCountry))
);
}

尽量避免在彼此内部嵌套许多.subscribe(的诱惑。正如@praveen-soni所提到的,switchMap可以帮助解决这个问题。

然后当所有国家都被处理时获得状态,我认为forkJoin是完美的:它接受一个可观测列表,一旦它们全部完成,就会排放。

如何构建可观察对象列表?你最初有一个国家列表,你可以把每个国家映射到处理那个国家的可观察对象。我们也可以使用一个单独的catchError,这样错误就不会关闭整个流,而只关闭特定国家的一个。

我觉得应该是这样的:

const result$ = forkJoin(
countries.map((country) =>
this.isActiveCountry(country.isActive).pipe(
switchMap((data) => {
country.serverId = data.serverId;
return this.uploadPhotoToApi(country.fileSource);
}),
switchMap((response) => {
country.serverFileId = response.serverFileId;
return this.sendCountryToApi(country);
}),
switchMap(() => {
return this.countriesTable.delete(country.id);
}),
map(() => {
// Everything went OK, map it to an OK status
return {
type: "success",
};
}),
catchError((error) => {
// If any step fails, map it to an error type
return of({
type: "error",
error,
});
}),
take(1) // Make sure the observable completes
)
)
);
// result$ can now be used as any other observable
result$.subscribe(result => {
console.log(result);
})

最新更新