RxJS重试未按预期进行



我很难让RxJS的retry运算符与map运算符结合使用。

本质上,我试图做的是获取一个值流(在本例中,GitHub用户用于测试目的),并对每个值进行一些操作。

然而,我希望能够以一种允许我为该操作设置有限次数重试的方式来处理错误。我还假设RxJS异步处理这样的事情。

Rx.Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
.concatAll(x => Rx.Observable.of(x))
.map(x => {
if(x.id % 5 == 0) {
console.log("error");
return Rx.Observable.throw("error");
}
return x;
})
.retry(3)
.subscribe(x => console.log(x), e => console.log(e));

这就是我到目前为止所得到的,然而,在它应该重试相同值3次的情况下,它只打印一次"错误",这不是我想要的行为。

我是错过了一些琐碎的东西,还是RxJS根本不可能做到这一点?简而言之,我想异步执行对值的操作,将所述操作重复有限次,但允许其他有效操作并行继续。我只使用promise就实现了类似的功能,但代码相当混乱,我希望RxJS能让我读起来更好。

编辑-删除早期失败的解决方案以保持清晰(一些注释可能不再相关)

要重试单个失败,您需要中断到单个请求并重试这些请求。

为了能够正确处理错误,请将流拆分为每个预期结果的子流。

我还添加了一些代码,允许其中一次重试成功。

const Observable = Rx.Observable
// Define tests here, allow requestAgain() to pass one
const testUser = (user) => user.id % 5 !== 0
const testUserAllowOneSuccess = (user) => user.id === 5 || user.id % 5 !== 0
const requestAgain = (login) => Observable.of(login)
.switchMap(login => jQuery.getJSON("https://api.github.com/users/" + login ))
.map(x => {
if(!testUserAllowOneSuccess(x)) {
console.log("logging error retry attempt", x.id);
throw new Error('Invalid user: ' + x.id)
}
return x;
})
.retry(3)
.catch(error => Observable.of(error))
const userStream = Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
.concatAll()
const passed = userStream.filter(x => testUser(x))
const failed = userStream.filter(x => !testUser(x))
.flatMap(x => requestAgain(x.login))
const retryPassed = failed.filter(x => !(x instanceof Error))
const retryFailed = failed.filter(x => (x instanceof Error))
.toArray()
.map(errors => { throw errors })
const output = passed.concat(retryPassed, retryFailed)
output.subscribe(
x=> console.log('subscribe next', x.id ? x.id : x), 
e => console.log('subscribe error', e)
);

CodePen 工作示例

我看到几个问题可能会给您带来问题:

1) 在你的map语句中,你通过返回一个可观察到的来进行投掷。如果成功案例也返回了一个可观察到的值,然后你将其压平(使用concat、switch或merge),但你的成功案例只是发出一个值,并且永远不会压平,那么这将是合适的。因此,当返回Throw observable时,它永远不会变平,因此只会通过并记录可观察对象。

相反,您可以只做一个简单的javascriptthrow 'error'(尽管考虑抛出一个错误对象而不是字符串)

2) 当重试出现错误时,它将重新订阅。如果您使用可观察性来进行提取,这意味着它将再次进行提取。然而,您使用一个promise进行抓取,然后只根据该promise创建了一个可观察的。因此,当你重新订阅observable时,它最终会使用与第一次相同的promise对象,并且该promise已经处于解析状态。不会进行额外的提取。

要解决这个问题,您需要有一个可观察的获取版本。如果你需要从头开始制作,它可能看起来像这样:

Rx.Observable.create(observer => {
jQuery.getJSON("https://api.github.com/users")
.then(result => {
observer.next(result);
observer.complete();
}, err => {
observer.error(err);
observer.complete();
});
})

不过,如果你打算经常这样做,我建议你创建一个函数来完成包装。

编辑:

我的最终目标是基本上能够获取一组api URL,并使用每个URL启动get请求。如果get请求由于任何原因失败,它应该尝试总共3次以再次到达该端点。然而,这种get请求的失败不应阻止其他请求的发生,它们应该并行发生。此外,如果一个get请求失败了3次,那么它应该被标记为失败,并且不应该停止整个过程。到目前为止,我还没有找到使用RxJS的方法。

然后我会做这样的事情:

function getJson (url) {
return Rx.Observable.create(observer) => {
jQuery.getJSON(url)
.then(result => {
observer.next(result);
observer.complete();
}, err => {
observer.error(err);
observer.complete();
});
}
}
const endpoints = ['someUrl', 'someOtherUrl'];
const observables = endpoints.map(endpoint => 
return getJson(url)
.retry(3)
.catch(err => Rx.Observable.of('error'));
});
Rx.Observable.forkJoin(...observables)
.subscribe(resultArray => {
// do whatever you need to with the results. If any of them
//   errored, they will be represented by just the string 'error'
});

映射函数返回Rx.Observable.throw("error");。这是错误的,因为下面的retry()将接收它的包装(即作为Observable<Observable<T>>而不是Observable<T>。您可以使用flatMap来纠正这一点,但也需要通过Rx.Observable.of(x)包装该值。

.flatMap(x => {
if(x % 5 == 0) {
console.log("error");
return Rx.Observable.throw("error");
}
return Rx.Observable.of(x);
})
.retry(3)
.subscribe(x => console.log(x), e => console.log(e));

另一种选择是throw new Error(),而不是尝试return Rx.Observable.throw("error");


确保重试正确的

Rx.Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
.retry(3)
.concatAll(x => Rx.Observable.of(x))
...

然后用from转换promise是不可重复的

示例:https://stackblitz.com/edit/rxjs-dp3md8

但是,使用Observable.create,效果良好

如果有人有让承诺可重复的解决方案,请发表评论。

最新更新