我有一个方法void OnAction(Action<Person> callback)
,我想使用反应扩展(Rx(从中创建一个IObservable<T>
。
我找到了两种可以帮助我的方法:Observable.FromEvent()
和Observable.Start()
:
var observable = Observable.Start(() =>
{
Person person = null;
_mngr.OnAction(p => person = p);
return person;
});
和:
var observable = Observable.FromEvent<Person>(
action => _mngr.OnAction(action), //Add Handler
action => // Remove Handler
{
});
第一个有一个闭包,我必须评估if person != null
:
var foo= observable.Where(p =>
{
if(p!=null) //...
});
第二个采用Action参数,该参数将给定的事件处理程序与底层.NET事件分离。。。但是OnAction方法不是.NET事件。
这两种方法都很好,但(在我看来(闻起来。。。
那么,从OnAction方法创建IOobservable的最佳方法是什么?
详细阐述Chris的回答并发表您的评论。从这里开始:
var personAsObservable = Observable.Create<Person>(observer => {
_mngr.OnAction(person => {
observer.OnNext(person);
observer.OnCompleted();
});
return Disposable.Empty;
});
目前,这将导致为每个订户调用OnAction
。
避免这种情况的一般方法是发布可观察到的。发布流会导致订阅者共享事件。
Publish
运算符返回一个可连接的可观察。这可以接受订阅者,但在您调用Connect()
(一个返回IDisposable
的方法,您可以使用它来控制与底层可观察对象的单个连接(之前,不会实际订阅底层流。
有几个与发布相关的运算符可以帮助您管理对基础流的订阅。
RefCount
使用可连接的可观察对象来管理连接,并在底层订阅运行时与订阅共享事件。一旦完成,后续订阅将重新启动。这可能足以满足您的目的。要使用它,请订阅以下内容(这是一个非常常见的Rx习惯用法(:
var personPub = personAsObservable.Publish().RefCount();
其他方法包括将Replay(n)
附加到可观察的源,其中n个事件将被缓存并重播给在底层流完成后到达的子序列订阅者。因此,如果你只想得到一次结果,这很有用。请注意,必须显式调用Replay
上的Connect
。您也可以直接拨打Publish
并管理自己的连接。
请注意,附加这些操作符不会改变底层可观察对象的行为——所有的发布、缓存等都是在附加的操作符上完成的。因此,在上面的示例中,订阅者应该使用personPub
。
显式控制连接如下所示:
IConnectableObservable<Person> personPub = personAsObservable.Publish();
var subscriberOne = personPub.Subscribe(...); // personAsObservable not started
var connection = personPub.Connect(); // *now* personAsObservable is subscribed
var subscriberTwo = personPub.Subscribe(...); // shares underlying subscription
// but could miss events
connection.Dispose(); // underlying connection terminated
// but may have already OnCompleted anyway
// in which case this is a no-op
var personAsObservable
= Observable.Create<Person>(observer => {
_mngr.OnAction(person => {
observer.OnNext(person);
observer.OnCompleted();
});
});
如果您希望确保此方法只调用一次,可以执行以下操作。
var publishedPerson = personaAsObservable.Replay(1);
publishedPerson.Connect();
publishedPerson.Subscribe(Console.WriteLine);
publishedPerson.Subscribe(Console.WriteLine);
publishedPerson.Subscribe(Console.WriteLine);
publishedPerson.Subscribe(Console.WriteLine);