使用多个使用者进行ReactiveX重试



快速提问,因为我觉得我一定错过了什么。我在这里使用rxjs,因为这是我面前的问题,我相信这是一个一般的reactiveX问题。

假设我有一组这样的Observable:

network_request = some_thing // An observable that produces the result of a network call
event_stream = network_request.flatMapLatest(function(v) {
    return connectToThing(v) // This is another observable that needs v
}) // This uses the result of the network call to form a long-term event-based connection

所以,这还可以。但问题是。有时连接会失败。

所以,如果我做event_stream.retry(),效果很好。当它失败时,它会重新进行网络调用,并获得一个新的v来进行新的连接。

问题

如果我想把network_request上的两个东西锁起来,会发生什么?也许我希望UI在每次网络调用完成时都做一些事情,比如在UI中显示一些关于v的内容?

我可以做:

shared = network_request.share() // Other implementations call this refCount
event_stream = shared.flatMapLatest(...) // same as above
ui_stream = shared.flatMapLatest(...) // Other transformation on network response

如果我不执行share,那么它会发出两个请求,这不是我想要的,但对于share,当event_stream后来出现错误时,它不会重试网络请求,因为refcount仍然为1(由于ui_stream),所以它立即返回completed。

我想要什么

这显然是我编的一个小例子来解释我的困惑。我想要的是,每次event_stream(长期连接)的结果出现错误时,都会发生以下所有情况:

  1. 再次发出网络请求
  2. 该请求的新响应用于建立新的连接,event_stream继续进行新的事件,就像什么都没发生一样
  3. CCD_ 11中也会发出相同的响应以进行进一步处理

这感觉不是一件复杂的事情,所以当涉及到拆分/扇出RX时,我一定只是误解了一些基本的东西。

我想我可以做的变通办法,但我想避免

我想导出这些可观察性,所以我不能只是重新构建它们,然后说"嘿,这是新东西"。我希望event_stream和所有下游处理都不知道发生了断开连接。与ui_stream相同。它有了新的价值。

我可能可以使用Subject作为一个生成计数器来解决问题,每次我想重新启动时都会ping它,并在此基础上将network_request放入flatMap中,这样我就可以破坏share。。。但这感觉是一个非常棘手的解决方案,所以我觉得必须有更好的方法。

我从根本上误解了什么?

随着我对这个问题的思考,我得到了与ionoy相同的认识,即retry只是断开连接和重新连接,上游不知道这是由于错误造成的。

当我思考我想要什么时,我意识到我真的想要一个像链条一样的东西,也是一个旁观者,所以我现在有了这个:

network_request = some_thing
network_shadow = new Rx.Subject()
event_stream = network_request.do(network_shadow).flatMapLatest(...)
ui_stream = network_shadow.whatever

它的属性是,在event_stream或下游的重试将导致整个事件重新启动,而ui_stream是它自己的事情。那里的任何错误都没有任何作用,因为network_shadow实际上不是event_stream的订阅者,但只要主事件链正在运行,它就会剥离值。

我觉得这并不理想,但这比我担心的要好,那就是在doOnError中有一个restartEverything.onNext(),这会很恶心

我现在要处理这个问题,我们会看看它在哪里咬我…

您需要使用Publish使您的寒冷变得可观察到的炎热。阅读http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#HotAndCold以获得一个很好的解释。

最新更新