如何使用RxJS调用具有时间间隔的多个依赖api调用



我正试图在angular 11中为这样的场景编写一个代码-

我有文件列表,对于我命中的每个文件(比如api1(,我从响应中获取一个fileId,并将其传递给另一个api(比如api2(,我想继续每3秒命中一次api2,除非我没有得到状态=";可用";在回应中。一旦我得到可用状态,我就不需要再点击该fileId的api2,我们就可以开始处理循环中的下一个文件了。

我拥有的每个文件的整个过程。

我知道我们可以使用rxjs操作符来实现这一点,比如mergeMap或switchMap(因为序列现在对我来说并不重要(。但我对rxjs很陌生,不知道如何将其组合在一起。

这就是我现在正在做的事情-

this.filesToUpload.forEach((fileItem) => {
if (!fileItem.uploaded) {
if (fileItem.file.size < this.maxSize) {
self.fileService.translateFile(fileItem.file).then( //hit api1
(response) => {
if (response && get(response, 'status') == 'processing') {
//do some processing here 
this.getDocumentStatus(response.fileId);
} 
},
(error) => {
//show error
}
);
}
}
}); 
getDocumentStatus(fileId:string){
this.docStatusSubscription = interval(3000)   //hitting api2 for every 3 seconds 
.pipe(takeWhile(() => !this.statusProcessing))
.subscribe(() => {
this.statusProcessing = false;
this.fileService.getDocumentStatus(fileId).then((response)=>{
if(response.results.status=="available"){
this.statusProcessing = true;
//action complete for this fileId
}
},(error)=>{
});

})

}

以下是在描述了您想要的内容后,我可以如何做到这一点。

  1. 创建一个要进行的所有调用的可观测值列表
  2. 将列表连接在一起
  3. 订阅

实现这一功能的原因是,我们只订阅一次(而不是每个文件订阅一次(,并且我们让运营商处理其他所有内容的订阅和取消订阅。

然后什么都不会发生,直到我们订阅。这样一来,concat就可以为我们完成繁重的任务。我们自己不需要用this.statusProessing之类的变量来跟踪任何事情。这一切都由我们来处理!这样就不那么容易出错。

// Create callList. This is an array of observables that each hit the APIs and only
// complete when status == "available".
const callList = this.filesToUpload
.filter(fileItem => !fileItem.uploaded && fileItem.file.size < this.maxSize)
.map(fileItem => this.createCall(fileItem));
// concatenate the array of observables by running each one after the previous one
// completes.
concat(...callList).subscribe({
complete: () => console.log("All files have completed"),
error: err => console.log("Aborted call list due to error,", err)
});
createCall(fileItem: FileItemType): Observable<never>{
// Use defer to turn a promise into an observable 
return defer(() => this.fileService.translateFile(fileItem.file)).pipe(
// If processing, then wait untill available, otherwise just complete
switchMap(translateFileResponse => {
if (translateFileResponse && get(translateFileResponse, 'status') == 'processing') {
//do some processing here 
return this.delayByDocumentStatus(translateFileResponse.fileId);
} else {
return EMPTY;
}
}),
// Catch and then rethrow error. Right now this doesn't do anything, but If 
// you handle this error here, you won't abort the entire call list below on 
// an error. Depends on the behaviour you're after.
catchError(error => {
// show error
return throwError(() => error);
})
);
}
delayByDocumentStatus(fileId:string): Observable<never>{
// Hit getDocumentStatus every 3 seconds, unless it takes more
// than 3 seconds for api to return response, then wait 6 or 9 (etc)
// seconds.
return interval(3000).pipe(
exhaustMap(_ => this.fileService.getDocumentStatus(fileId)),
takeWhile(res => res.results.status != "available"),
ignoreElements(),
tap({
complete: () => console.log("action complete for this fileId: ", fileId)
})
);
}

最新更新