我从反应式扩展开始,我遇到了一个问题,我不确定我是否在正确的轨道上。
我正在使用可观察量为具有 .NET 的事件代理创建和使用侦听器。我创建了一个"IncomingMessage"类,其中包含来自事件代理的消息,当我它们进来时,我开始在Observerable.Create函数中创建侦听器。这很有效。
现在我还想从侦听器获取状态通知,如"正在连接..."、"已连接"、"正在关闭..."它不是传入消息,所以我创建了一个具有"消息"属性的类"BrokerEvent"以及"传入消息"和"BrokerEvent"的接口。现在我通过观察者发送两者。在它们发生时继续下一个(...(。到目前为止,这也运作良好。
但是,在订阅者方面,我现在在过滤我需要的事件时遇到了一些问题。
我愿意:
GetObservable().Where(x => x is BrokerEvent ||
(x is IncomingMessage msg &&
msg.User == "test")).Subscribe(...)
这有效,但是我需要再次找出订阅中的类型,这似乎有点丑陋。
尝试了一会儿后,我现在最终这样做了......
var observable = GetObservable().Publish();
observable.OfType<BrokerEvent>().Subscribe(...);
observable.OfType<IncomingMessage>().Where(x=>x.User == "test").Subscribe(...);
var disposable = observable.Connect();
这似乎也有效,但由于我是反应式扩展的新手,我不太确定这是否有任何不必要的副作用。我也不确定这是否是将状态消息包含在流中的"正确"方式。有没有更好的方法来处理这个问题(可以在不使用发布的情况下(还是要走的路?
停止倾听就足以处理我从.Connect(( 还是我还必须处理我从 .订阅((?
谢谢!
我假设GetObservable
返回IObservable<object>
,这并不理想。
执行上述代码的最佳方法如下:
var observable = GetObservable().Publish();
var sub1 = observable.OfType<BrokerEvent>().Subscribe(_ => { });
var sub2 = observable.OfType<IncomingMessage>().Where(x => x.User == "test").Subscribe(_ => { });
var connectSub = observable.Connect();
var disposable = new CompositeDisposable(sub1, sub2, connectSub);
然后,复合一次性物品将在处理时处理掉所有儿童。
如果两个消息流彼此无关,则该方法将起作用。但是,由于您基本上有一个控制消息流和一个数据消息流,因此我假设来自一个的消息在处理另一个消息时可能很重要。在这种情况下,您可能希望将其视为一个流。
在这种情况下,您可能希望为可观察量创建区分联合类型,这可能会使处理更容易。
您可以做的是创建一个"事件处理程序"类,其中包含三个重载的"进程消息"。一个用于不执行任何操作的对象(默认(,一个用于状态类型,一个用于传入消息。在。订阅使用此语法
m=>processor.Process((dynamic)m)
这将根据需要调用正确的实现或不执行任何操作。
如果要在调用 Process 之前进行过滤,可以引入一个公共类(ProcessableMesage 或类似类(,也可以调用 .合并 OfType 流,或者可以通过使用动态 MessageFilter 类来采用与上述相同的方法。