如何在RxJS中重试管道内的可观察对象?



我有以下代码:

notificationsWsSubject.pipe(
filter((socket): socket is Socket => !!socket),
switchMap(socket => fromEvent<Socket.DisconnectReason>(socket, 'disconnect')),
tap(() => wsConnectedSubject.next(false)),
filter(reason => (['ping timeout', 'transport close', 'transport error'] as Socket.DisconnectReason[]).includes(reason)),
switchMap(() => signedInObservable),
switchMap(user => forkJoin([of(user), from(getNotificationsWebsocketTicket())])),
).subscribe(values => {
// Connect with websocket
}, error  => {
// Throw error to user
})

总流程:

  • 监听socket.io-client
  • 中的disconnect事件
  • 如果断开原因是由于网络错误,则继续
  • signedInObservable获取当前登录的用户
  • 通过调用getNotificationsWebsocketTicket()(并使用from()从它创建Observable)在服务器中生成票据
  • subscribe(value => ... )做事情

我的问题是,我想在from(getNotificationsWebsocketTicket())失败的情况下重试,每次失败之间延迟5秒。
只有在3次重试后,我才希望整个主可观察对象失败。

类似:

from(getNotificationsWebsocketTicket()).pipe(delayedRetry(3, 5000))

这可能吗?

是的,你可以这样做:

from(getNotificationsWebsocketTicket()).pipe(
retryWhen(e$ => e$.pipe(
take(3),
delay(5000)
))
);

retryWhen是一个很有趣的操作符。e$是一个由retryWhen操作符管理的可观察对象。当源出错时,retryWhen将错误发送给e$

接下来发生的事情取决于你的lambda返回的可观察对象:

Errors是错误,
Completes是完成,
但是排放(next)是提示重试

最新更新