如何使用rxjs有条件地重复一个promise



我想重复一个API调用,该调用返回Promise,有条件地使用rxjs。

API方法接收一个id,每次调用都会通过添加计数器前缀来更改该id。这些调用将重复进行,直到数据满足某个条件或计数器达到特定的数字X。如何使用rxjs完成此操作?

API方法:

fetchData(id):Promise<data>

try1:fetchData(id(

try2:fetchData(id_1(

try3:fetchData(id_2(

IMO,最好通过Promises或RxJS处理轮询,而不要混合它们。我会举例说明使用RxJS。

尝试以下

  1. 使用RxJSfrom函数将promise转换为observable
  2. 使用RxJS函数,如timerinterval,可以在固定的时间间隔内定期发出值
  3. 使用更高阶的映射运算符(如switchMap(从外部发射映射到API调用。有关不同类型的高阶映射运算符的简要描述,请参阅此处
  4. 使用两个takeWhile运算符,分别针对您的每个条件使用一个来完成订阅
  5. 使用filter运算符仅转发通过条件的排放
import { from } from 'rxjs';
fetchData(id: any): Observable<any> {  // <-- return an observable
return from(apiCall);                // <-- use `from` to convert Promise to Observable
}
import { timer } from 'rxjs';
import { filter, switchMap, takeWhile } from 'rxjs/operators';
timer(0, 5000).pipe(                        // <-- poll every 5 seconds
takeWhile((index: number) => index < 20)  // <-- stop polling after 20 attempts
switchMap((index: number) => 
this.someService.apiCall(index+1)       // <-- first emission from `timer` is 0
),
takeWhile(                                // <-- stop polling when a condition from the response is unmet
(response: any) => response.someValue !== someOtherValue,
true                                    // <-- emit the response that failed the test
),
filter((response: any) => 
response.someValue === someOtherValue   // <-- forward only emissions that pass the condition
)
).subscribe({
next: (response: any) => {
// handle response
},
error: (error: any) => {
// handle error
}
});

编辑:2ndtakeWhile中的条件与要求相反。我已经调整了条件,并包含了inclusive=true参数。感谢@Siddhant的评论。

您可以使用concatMap来确保一次只尝试一个调用。range给出了最大呼叫数,因为如果条件满足/不满足,takeWhile将提前(在范围完成之前(取消订阅。

看起来可能是这样的:

// the data met some condition
function metCondition(data){
if(data/*something*/){
return true;
} else {
return false
}
}
// the counter reach to a specific number X
const x = 30;
range(0, x).pipe(
concatMap(v => fetchData(`id_${v === 0 ? '' : v}`)),
takeWhile(v => !metCondition(v))
).subscribe(datum => {
/* Do something with your data? */
});

您可以尝试重试当:

let counter=0;
const example = of(1).pipe(
switchMap(x => of(counter)), // Replace of() with from(fetchData('id_'+counter))
map(val => {
if (val < 5) {
counter++;
// error will be picked up by retryWhen
throw val;
}
return val;
}),
retryWhen(errors =>
errors.pipe(
// log error message
tap(val => console.log(`Response was missing something`)),
)
)
);

这并不理想,因为它需要在外部范围内有一个计数器,但在有更好的解决方案(尤其是没有基于时间的重试(之前,这应该是可行的。

我知道您已经使用rxjs进行了指定,但是您也指定了fetchData()返回promise而不是observable。在这种情况下,我建议使用asyncawait,而不是rxjs。

async retryFetch() {
let counter = 0;
while (counter++ < 20 && !this.data) {
this.data = await this.fetchData(counter);
}
}

你可以把你想要的任何东西放在条件中。

即使你的api调用返回了一个可观察的结果,我仍然建议用promise来包装它,并使用这个可读性很强的解决方案。

下面的stackblitz用promise包装了一个标准的http.get,并实现了上面的函数。promise将随机返回数据或未定义数据。

https://stackblitz.com/edit/angular-ivy-rflclt?file=src/app/app.component.ts

let count = 0
const timerId = setTimout( () =>{
if(count){
fetchData(`id_${count}`)
}else{
fetchData('id')
}
count = count + 1
,60000}) //runs every 60000 milliseconds
const stopTimer = () =>{ //call this to stop timer
clearTimeout(timerId);
}

最新更新