使用 Observable.Publish 和反应式扩展



我对使用Observable.Publish进行多播处理的生命周期有点困惑。应该如何正确使用连接?与直觉相反,我发现我不需要为组播观察者调用连接来启动他们的订阅。

var multicast = source.Publish();
var field0 = multicast.Select(record => record.field0);
var field1 = multicast.Select(record => record.field1);
// Do I need t*emphasized text*o call here?
var disposable = multicast.connect()
// Does calling 
disposable.Dispose();
// unsubscribe field0 and field1?

编辑

我的困惑是为什么我在没有打电话时成功订阅 在 IConnectableObservable 显式上连接。但是我打电话 等待 IConnectableObservable 隐式调用 Connect

Public Async Function MonitorMeasurements() As Task

Dim cts = New CancellationTokenSource
Try
Using dialog = New TaskDialog(Of Unit)(cts)
Dim measurementPoints = 
MeasurementPointObserver(timeout:=TimeSpan.FromSeconds(2)).
TakeUntil(dialog.CancelObserved).Publish()
Dim viewModel = New MeasurementViewModel(measurementPoints)
dialog.Content = New MeasurementControl(viewModel)
dialog.Show()
Await measurementPoints
End Using
Catch ex As TimeoutException
MessageBox.Show(ex.Message)
Catch ex As Exception
MessageBox.Show(ex.Message)
End Try
End Function

请注意,我的 TaskDialog 公开了一个名为 CancelObserve 的可观察量 按下取消按钮时。

溶液

该解决方案发布在@asti的链接中。这是RX团队在该链接中的引述

请注意,使用 await 会导致订阅发生,从而使可观察序列变热。此版本中包括对 IConnectableObservable 的等待支持,这会导致将序列连接到其源并订阅它。如果没有连接调用,等待操作将永远不会完成

源上的Publish返回一个IConnectableObservable<T>,该本质上是用Connect方法IObservable<T>的。可以使用Connect及其返回IDisposable来控制对源的订阅。

Rx被设计成一个即发即弃的系统。订阅在显式释放订阅或完成/错误之前不会终止。

即,disp0 = field0.Subscribe(...); disp1 = field1.Subscribe(...)- 订阅在显式释放disp0, disp1之前不会终止 - 这与与多播源的连接无关。

您可以在不干扰以下管道的情况下连接和断开连接。不用担心手动管理连接的一种更简单的方法是使用.Publish().RefCount()只要至少有一个观察者仍然订阅了连接,它就会保持连接。这称为预热可观察量。


针对问题中的编辑进行了更新

OP在IConnectableObservable<T>上打电话给await

来自 Rx 的发行说明:

..使用 await 通过导致 订阅进行。此版本中包括等待支持 对于 IConnectableObservable,这会导致将序列连接到 它的来源以及订阅它。如果没有连接呼叫, 等待操作永远不会完成。

示例(取自同一页面)

static async  void Foo()
{
var xs = Observable.Defer(() =>
{
Console.WriteLine("Operation started!");
return Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
});
var ys = xs.Publish();
// This doesn't trigger a connection with the source yet.
ys.Subscribe(x => Console.WriteLine("Value = " + x));
// During the asynchronous sleep, nothing will be printed.
await Task.Delay(5000);
// Awaiting causes the connection to be made. Values will be printed now,
// and the code below will return 9 after 10 seconds.
var y =  await ys;
Console.WriteLine("Await result = " + y);
}

发布允许您共享订阅。这显然对于使冷可观察序列热最有用。即,采用导致某些订阅副作用(可能是与网络的连接)发生的序列,并确保副作用执行一次,并且序列的结果在消费者之间共享。

实际上,您在冷序列上调用发布,订阅使用者,然后在订阅后连接已发布的序列,以缓解任何争用条件。

所以基本上,你上面做了什么。

对于已经很热门的序列,例如主题,FromEventPattern或已经发布和连接的内容,这在很大程度上毫无意义。

从 Connect() 方法中释放值将"断开"序列,防止使用者获取更多值。如果使用者订阅中的任何一个想要提前分离,也可以释放这些订阅。

说了这么多,你似乎在做正确的事情。您看到的问题是什么?我假设您正在连接到一个已经很热的序列。

相关内容

  • 没有找到相关文章