首先,我没有找到自定义实现ObservableBase或AnonymousObservable的好例子。我不知道我需要在我的情况下实施哪一个(如果有的话(。情况是这样的。
我使用第三方库,有一个类,我们称之为Producer,它允许我在其上设置一个委托,如objProducer.Attach(MyHandler(。MyHandler 将接收来自生产者的消息。我正在尝试围绕生产者创建一个包装器,以使其可观察,理想情况下使其成为一种独特的类型,而不是仅创建可观察的实例(如 Observable.Create(。
已编辑:第三方制作人具有以下界面
public delegate void ProducerMessageHandler(Message objMessage);
public class Producer : IDisposable {
public void Start();
public void Attach(ProducerMessageHandler fnHandler);
public void Dispose();
}
正如我提到的,我无法控制它的源代码。它打算像这样使用:创建一个实例,调用 Attach 并传递一个委托,调用 Start,它基本上在生产者收到或生成消息时启动在提供的委托内接收消息。
我正在考虑创建公共class ProducerObservable : ObservableBase<Message>
以便当有人订阅它时,我会(Rx库会(将消息推送给观察者。似乎我需要在我的 ProducerObservable 的构造函数中的某个地方调用 Attach ,然后我需要以某种方式在附加到它的观察器上调用 OnNext。这是否意味着我必须编写所有这些代码:向类添加LinkedList<IObserver<Message>>
观察者列表,然后在 ProducerObservable 上调用 SubscribeCore 抽象方法时添加观察器?然后显然我将能够在MyHandler中枚举LinkedList<IObserver<Message>>
并为每个调用OnNext。所有这些看起来都是可行的,但感觉并不完全正确。我希望 .net 反应式扩展能够更好地为这种情况做好准备,并且至少在基类的某个地方准备好LinkedList<IObserver<Message>>
的实现。
在使用 Rx 的代码中,"Producer"对象通常是通过公共属性或方法公开IObservable<T>
实例的对象。 Producer
类本身实现IObservable<T>
的情况不太常见,当它这样做时,它通过使用Rx
来做引擎盖下的繁重工作。 您绝对不想自己实现IObservable<T>
。
下面是一个示例,其中可观察量作为属性公开:
public class Producer
{
public Producer(ThirdPartyLib.Producer p)
{
var c = Observable.Create(observer =>
{
ProducerMessageHandler h = msg => observer.OnNext(msg);
p.Attach(h);
p.Start();
return Disposable.Empty;
}).Publish();
// Connect the observable the first time someone starts
// observing
Stream = Observable.Create(observer =>
{
var subscription = c.Subscribe(observer);
if (Interlocked.Exchange(ref _connected, 1) == 0)
{
c.Connect();
}
return subscription;
});
}
private int _connected;
public IObservable<Message> Stream { get; private set; }
}
下面是同一个示例,我们通过委托给 Rx 来实际实现IObservable<T>
:
public class Producer : IObservable<Message>
{
public Producer(ThirdPartyLib.Producer p)
{
var c = Observable.Create(observer =>
{
ProducerMessageHandler h = msg => observer.OnNext(msg);
p.Attach(h);
p.Start();
return Disposable.Empty;
}).Publish();
// Connect the observable the first time someone starts
// observing
_stream = Observable.Create(observer =>
{
var subscription = c.Subscribe(observer);
if (Interlocked.Exchange(ref _connected, 1) == 0)
{
c.Connect();
}
return subscription;
});
}
private IObservable<Message> _stream;
// implement IObservable<T> by delegating to Rx
public IDisposable Subscribe(IObserver<Message> observer)
{
return _stream.Subscribe(observer);
}
}
以下是你应该做的事情,以保持Rx"友好":
public static class ObservableProducer
{
public static IObservable<Message> Create()
{
return
Observable.Using(() => new Producer(), p =>
Observable.Create<Message>(o =>
{
ProducerMessageHandler handler = m => o.OnNext(m);
p.Attach(handler);
return Disposable.Create(() => o.OnCompleted());
}));
}
}
你会像这样使用它:
IObservable<Message> query = ObservableProducer.Create();
应允许为所有新订阅创建多个Producer
实例 - 这就是 Rx 的工作方式。
但是,如果您只需要一个Producer
实例,请查看在此可观察对象上使用.Publish()
。
下面介绍如何确保单个Producer
实例是"自我管理"的:
IObservable<Message> query = ObservableProducer.Create().Publish().RefCount();
这将在第一个订阅上创建一个Producer
实例,并保留该Producer
,直到不再有任何订阅。这使得它成为"自我管理"和更好的解决方案,而不是滚动自己的课程。
如果你必须实现自己的类,那么你经常会犯错误。您作为此问题的答案添加的类有三个我可以看到的。
- 附加消息处理程序后实例化主题。如果生成者在附加过程中创建消息,则代码将失败。
- 您不会跟踪订阅。如果您不跟踪订阅,则无法处置它们。Rx 查询可以保留打开的昂贵资源,因此应尽早处置它们。
- 在处置生产者之前,您不会就此问题致电
.OnCompleted()
。
这是我对你的类的实现:
public class ProducerObservable : IObservable<Message>, IDisposable
{
private readonly Producer _Producer;
private readonly Subject<Message> _Subject;
private readonly CompositeDisposable _Disposables;
public ProducerObservable()
{
_Subject = new Subject<Message>();
ProducerMessageHandler fnHandler = m => _Subject.OnNext(m);
_Producer = new Producer();
_Producer.Attach(fnHandler);
_Producer.Start();
_Disposables = new CompositeDisposable();
_Disposables.Add(_Producer);
_Disposables.Add(_Subject);
}
public void Dispose()
{
_Subject.OnCompleted();
_Disposables.Dispose();
}
public IDisposable Subscribe(IObserver<Message> objObserver)
{
var subscription = _Subject.Subscribe(objObserver);
_Disposables.Add(subscription);
return subscription;
}
}
我还是不喜欢。在撰写本文时,我是[system.reactive]中拥有银牌的三个人之一(还没有人拥有金牌(,我从未实现过自己的可观察性。我只是刚刚意识到我没有打电话给.OnCompleted()
这个主题,所以我回去编辑了上面的代码。这是一个雷区。依靠内置运算符要好得多。
ObservableBase
存在的原因是为了帮助防止人们犯错误,但它并不能阻止它。
这个讨论只是给了我一个想法。不就是这个吗?
public class ProducerObservable : IObservable<Message>, IDisposable {
private readonly Producer _Producer;
private readonly Subject<Message> _Subject;
public ProducerObservable() {
_Produder = new Producer();
_Producer.Attach(Message_Received);
_Subject = new Subject<Message>();
_Producer.Start();
}
public void Dispose() {
_Producer.Dispose();
_Subject.Dispose();
}
public IDisposable Subscribe(IObserver<Message> objObserver) {
return _Subject.Subscribe(objObserver);
}
private void Message_Received(Message objMessage) {
_Subject.OnNext(objMessage);
}
}
因此,在我看来,我们避免了额外的级别,额外的可观察量,只有一个可观察量类型,基本上我只看到优点而没有缺点。