我有一个类,它负责以频繁但不规则的间隔生成事件,其他类必须使用和操作。 我想使用反应式扩展来完成此任务。
消费者方面非常简单;我让我的消费者类实现了IObserver<Payload>
一切似乎都很好。 问题出在生产者类上。
根据文档,不建议直接实现IObservable<Payload>
(即,将我自己的实现用于IDisposable Subscribe(IObserver<Payload> )
。 它建议改为使用Observable.Create()
函数集进行组合。 由于我的类将运行很长时间,因此我尝试使用 var myObservable = Observable.Never()
创建一个可观察量,然后,当我有新的有效负载可用时,调用 myObservable.Publish(payloadData)
。 但是,当我这样做时,我似乎并没有在我的消费者中达到OnNext
实现。
我认为,作为一种解决方法,我可以在我的类中创建一个事件,然后使用 FromEvent
函数创建可观察量,但这似乎是一种过于复杂的方法(即,可观察量的新热点"需要"事件才能工作似乎很奇怪(。 我在这里忽略了一个简单的方法吗? 创建自己的可观察源的"标准"方法是什么?
创建一个new Subject<Payload>
并调用它OnNext
方法来发送事件。您可以与观察者一起Subscribe
主题。
主题的使用经常引起争议。有关主题使用(链接到入门(的全面讨论,请参阅此处 - 但总而言之,这个用例听起来有效(即它可能符合本地 + 热门标准(。
顺便说一句,订阅接受委托的重载可能会消除您提供IObserver<T>
实现的需要。
Observable.Never()
不"发送"通知,您应该使用Observable.Return(yourValue)
如果您需要包含具体示例的指南,我建议您阅读 Rx 简介
除非我遇到更好的方法,否则我现在决定的是使用BlockingCollection
。
var _itemsToSend = new BlockingCollection<Payload>();
IObservable<MessageQueuePayload> _deliverer =
_itemsToSend.GetConsumingEnumerable().ToObservable(Scheduler.Default);