同步RX中的多个订阅



是否可以强制多个RX订阅到不同的可观察对象连续运行(而不是同时运行)?

我知道我可以使用EventLoopScheduler,但这会降低性能,因为所有的处理将在一个线程上完成。

如果你打算运行一个可观察对象直到OnCompleted,然后开始下一个,你应该看看Concat。如果你想同时订阅多个不同的可观察对象,你可以使用Merge(如果语义对你的场景有意义的话)。如果Merge不合适,我建议在观察者方法或你已经知道的EventLoopScheduler中使用标准的线程同步方法(lock, Monitor等)之一。

编辑原答案保留在下面

是的,可以强制执行串行观察器。然而,你是否需要取决于可观察到的东西。通常,热可观察对象已经连续运行,而冷可观察对象则不会。这是冷热观测的工作方式不同所产生的副作用。为了使冷的观测物变热,从而使观测物连续运行,使用Publish。下面是演示各种行为的示例。

Sub Main()
    'hot observable, runs serially
    Dim trigger As New ObsEvent
    Dim eobs = Observable.FromEventPattern(Of ItemEventArgs(Of String))(
                    Sub(h) AddHandler trigger.Triggered, h,
                    Sub(h) RemoveHandler trigger.Triggered, h)
    Dim sub1 = eobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting event observer 1: {0}", v.EventArgs.Item)
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending event observer 1")
                              End Sub)
    trigger.Trigger("event trigger 1")
    Dim sub2 = eobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting event observer 2: {0}", v.EventArgs.Item)
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending event observer 2")
                              End Sub)
    trigger.Trigger("event trigger 2")
    Console.WriteLine()
    Console.WriteLine()
    'cold observable, runs "simultaneously"
    Dim tobs = Observable.Timer(TimeSpan.FromSeconds(5))
    sub1 = tobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting timer observer 1")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending timer observer 1")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer 1 completed"))
    Thread.Sleep(500)
    sub2 = tobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting timer observer 2")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending timer observer 2")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer 2 completed"))
    'cold observable turned hot, runs serially
    Dim pobs = tobs.Publish()
    sub1 = pobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting publish observer 1")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending publish observer 1")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer P1 completed"))
    Thread.Sleep(500)
    sub2 = pobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting publish observer 2")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending publish observer 2")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer P2 completed"))
    pobs.Connect()
    Console.ReadKey()
End Sub

相关内容

  • 没有找到相关文章

最新更新