快速提问,因为我觉得我一定错过了什么。我在这里使用rxjs,因为这是我面前的问题,我相信这是一个一般的reactiveX问题。
假设我有一组这样的Observable:
network_request = some_thing // An observable that produces the result of a network call
event_stream = network_request.flatMapLatest(function(v) {
return connectToThing(v) // This is another observable that needs v
}) // This uses the result of the network call to form a long-term event-based connection
所以,这还可以。但问题是。有时连接会失败。
所以,如果我做event_stream.retry()
,效果很好。当它失败时,它会重新进行网络调用,并获得一个新的v
来进行新的连接。
问题
如果我想把network_request
上的两个东西锁起来,会发生什么?也许我希望UI在每次网络调用完成时都做一些事情,比如在UI中显示一些关于v
的内容?
我可以做:
shared = network_request.share() // Other implementations call this refCount
event_stream = shared.flatMapLatest(...) // same as above
ui_stream = shared.flatMapLatest(...) // Other transformation on network response
如果我不执行share
,那么它会发出两个请求,这不是我想要的,但对于share
,当event_stream
后来出现错误时,它不会重试网络请求,因为refcount仍然为1(由于ui_stream
),所以它立即返回completed。
我想要什么
这显然是我编的一个小例子来解释我的困惑。我想要的是,每次event_stream
(长期连接)的结果出现错误时,都会发生以下所有情况:
- 再次发出网络请求
- 该请求的新响应用于建立新的连接,
event_stream
继续进行新的事件,就像什么都没发生一样 - CCD_ 11中也会发出相同的响应以进行进一步处理
这感觉不是一件复杂的事情,所以当涉及到拆分/扇出RX时,我一定只是误解了一些基本的东西。
我想我可以做的变通办法,但我想避免
我想导出这些可观察性,所以我不能只是重新构建它们,然后说"嘿,这是新东西"。我希望event_stream
和所有下游处理都不知道发生了断开连接。与ui_stream
相同。它有了新的价值。
我可能可以使用Subject
作为一个生成计数器来解决问题,每次我想重新启动时都会ping它,并在此基础上将network_request
放入flatMap
中,这样我就可以破坏share
。。。但这感觉是一个非常棘手的解决方案,所以我觉得必须有更好的方法。
我从根本上误解了什么?
随着我对这个问题的思考,我得到了与ionoy相同的认识,即retry
只是断开连接和重新连接,上游不知道这是由于错误造成的。
当我思考我想要什么时,我意识到我真的想要一个像链条一样的东西,也是一个旁观者,所以我现在有了这个:
network_request = some_thing
network_shadow = new Rx.Subject()
event_stream = network_request.do(network_shadow).flatMapLatest(...)
ui_stream = network_shadow.whatever
它的属性是,在event_stream
或下游的重试将导致整个事件重新启动,而ui_stream
是它自己的事情。那里的任何错误都没有任何作用,因为network_shadow
实际上不是event_stream
的订阅者,但只要主事件链正在运行,它就会剥离值。
我觉得这并不理想,但这比我担心的要好,那就是在doOnError
中有一个restartEverything.onNext()
,这会很恶心
我现在要处理这个问题,我们会看看它在哪里咬我…
您需要使用Publish
使您的寒冷变得可观察到的炎热。阅读http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#HotAndCold以获得一个很好的解释。