我是响应式扩展的新手,并且处理具有这样定义的事件的COM库:
public delegate void MyDelegate(int requestId, double price, int amount);
public event MyDelegate MyEvent;
我如何正确地观察这个?我尝试使用Observable.FromEvent()
,但由于事件的参数不是类型EventArgs
,我不知道FromEvent()
或FromEventPattern()
是如何工作的。
我目前的解决方法是将自定义委托附加到事件,然后调用Subject.OnNext()
,但我猜这不是我应该怎么做。
下面是我当前解决方法的一个例子:
MyEvent += new MyDelegate((int requestId, double price, int amount) =>
{
Task.Run(() =>
{
var args = new MyArgs()
{
requestId = requestId,
price = price,
amount = amount,
};
this.mySubject.OnNext(args);
});
});
它有一个特殊的FromEvent
过载。这个函数的签名看起来是这样的:
IObservable<TEventArgs> FromEvent<TDelegate, TEventArgs>(Func<Action<TEventArgs>, TDelegate> conversion,
Action<TDelegate> addHandler,
Action<TDelegate> removeHandler);
转换函数是这里重要的部分,基本上你告诉Rx你的委托如何映射到一个具体的类型。
在您的场景中,它最终看起来像这样:
Observable.FromEvent<MyDelegate, MyArgs>(
converter => new MyDelegate(
(id, price, amount) => converter(new MyArgs {
RequestId = id,
Price = price,
Amount = amount
})
),
handler => MyEvent += handler,
handler => MyEvent -= handler);
那么这一切都在做什么呢?在内部,它与您正在做的类似(我将从概念上解释它的作用,因为实现稍微复杂一些)。当进行新的订阅时,将调用转换函数,并将observer.OnNext
作为converter
参数传入。这个lambda将返回一个新的MyDelegate
实例,该实例包装了我们提供的转换函数((id, price, amount) => ...
)。这是随后传递给handler => MyEvent += handler
方法的内容。
之后,每次事件被触发时,它将调用我们的lambda方法并将传递的参数转换为MyArgs
的实例,然后将其传递给converter
/observer.OnNext
。
此外,除了所有这些神奇的功能之外,它还会在您使用它时清理事件处理程序,优雅地将异常传递到下游,并通过在多个观察者之间共享单个事件处理程序来管理内存。
源代码