我想重复一个API调用,该调用返回Promise,有条件地使用rxjs。
API方法接收一个id,每次调用都会通过添加计数器前缀来更改该id。这些调用将重复进行,直到数据满足某个条件或计数器达到特定的数字X。如何使用rxjs完成此操作?
API方法:
fetchData(id):Promise<data>
try1:fetchData(id(
try2:fetchData(id_1(
try3:fetchData(id_2(
IMO,最好通过Promises或RxJS处理轮询,而不要混合它们。我会举例说明使用RxJS。
尝试以下
- 使用RxJS
from
函数将promise转换为observable - 使用RxJS函数,如
timer
或interval
,可以在固定的时间间隔内定期发出值 - 使用更高阶的映射运算符(如
switchMap
(从外部发射映射到API调用。有关不同类型的高阶映射运算符的简要描述,请参阅此处 - 使用两个
takeWhile
运算符,分别针对您的每个条件使用一个来完成订阅 - 使用
filter
运算符仅转发通过条件的排放
import { from } from 'rxjs';
fetchData(id: any): Observable<any> { // <-- return an observable
return from(apiCall); // <-- use `from` to convert Promise to Observable
}
import { timer } from 'rxjs';
import { filter, switchMap, takeWhile } from 'rxjs/operators';
timer(0, 5000).pipe( // <-- poll every 5 seconds
takeWhile((index: number) => index < 20) // <-- stop polling after 20 attempts
switchMap((index: number) =>
this.someService.apiCall(index+1) // <-- first emission from `timer` is 0
),
takeWhile( // <-- stop polling when a condition from the response is unmet
(response: any) => response.someValue !== someOtherValue,
true // <-- emit the response that failed the test
),
filter((response: any) =>
response.someValue === someOtherValue // <-- forward only emissions that pass the condition
)
).subscribe({
next: (response: any) => {
// handle response
},
error: (error: any) => {
// handle error
}
});
编辑:2ndtakeWhile
中的条件与要求相反。我已经调整了条件,并包含了inclusive=true
参数。感谢@Siddhant的评论。
您可以使用concatMap来确保一次只尝试一个调用。range
给出了最大呼叫数,因为如果条件满足/不满足,takeWhile
将提前(在范围完成之前(取消订阅。
看起来可能是这样的:
// the data met some condition
function metCondition(data){
if(data/*something*/){
return true;
} else {
return false
}
}
// the counter reach to a specific number X
const x = 30;
range(0, x).pipe(
concatMap(v => fetchData(`id_${v === 0 ? '' : v}`)),
takeWhile(v => !metCondition(v))
).subscribe(datum => {
/* Do something with your data? */
});
您可以尝试重试当:
let counter=0;
const example = of(1).pipe(
switchMap(x => of(counter)), // Replace of() with from(fetchData('id_'+counter))
map(val => {
if (val < 5) {
counter++;
// error will be picked up by retryWhen
throw val;
}
return val;
}),
retryWhen(errors =>
errors.pipe(
// log error message
tap(val => console.log(`Response was missing something`)),
)
)
);
这并不理想,因为它需要在外部范围内有一个计数器,但在有更好的解决方案(尤其是没有基于时间的重试(之前,这应该是可行的。
我知道您已经使用rxjs进行了指定,但是您也指定了fetchData()
返回promise
而不是observable
。在这种情况下,我建议使用async
和await
,而不是rxjs。
async retryFetch() {
let counter = 0;
while (counter++ < 20 && !this.data) {
this.data = await this.fetchData(counter);
}
}
你可以把你想要的任何东西放在条件中。
即使你的api调用返回了一个可观察的结果,我仍然建议用promise来包装它,并使用这个可读性很强的解决方案。
下面的stackblitz用promise包装了一个标准的http.get
,并实现了上面的函数。promise将随机返回数据或未定义数据。
https://stackblitz.com/edit/angular-ivy-rflclt?file=src/app/app.component.ts
let count = 0
const timerId = setTimout( () =>{
if(count){
fetchData(`id_${count}`)
}else{
fetchData('id')
}
count = count + 1
,60000}) //runs every 60000 milliseconds
const stopTimer = () =>{ //call this to stop timer
clearTimeout(timerId);
}