如何正确观察非标准事件



我是响应式扩展的新手,并且处理具有这样定义的事件的COM库:

public delegate void MyDelegate(int requestId, double price, int amount);
public event MyDelegate MyEvent;

我如何正确地观察这个?我尝试使用Observable.FromEvent(),但由于事件的参数不是类型EventArgs,我不知道FromEvent()FromEventPattern()是如何工作的。

我目前的解决方法是将自定义委托附加到事件,然后调用Subject.OnNext(),但我猜这不是我应该怎么做。

下面是我当前解决方法的一个例子:

        MyEvent += new MyDelegate((int requestId, double price, int amount) =>
        {
            Task.Run(() =>
            {
                var args = new MyArgs()
                {
                    requestId = requestId,
                    price = price,
                    amount = amount,
                };
                this.mySubject.OnNext(args);
            });
        });

它有一个特殊的FromEvent过载。这个函数的签名看起来是这样的:

IObservable<TEventArgs> FromEvent<TDelegate, TEventArgs>(Func<Action<TEventArgs>, TDelegate> conversion, 
                                                         Action<TDelegate> addHandler, 
                                                         Action<TDelegate> removeHandler);

转换函数是这里重要的部分,基本上你告诉Rx你的委托如何映射到一个具体的类型。

在您的场景中,它最终看起来像这样:

Observable.FromEvent<MyDelegate, MyArgs>(
  converter => new MyDelegate(
                  (id, price, amount) => converter(new MyArgs { 
                                                        RequestId = id, 
                                                        Price = price, 
                                                        Amount = amount
                                                       })
               ),
  handler => MyEvent += handler,
  handler => MyEvent -= handler);

那么这一切都在做什么呢?在内部,它与您正在做的类似(我将从概念上解释它的作用,因为实现稍微复杂一些)。当进行新的订阅时,将调用转换函数,并将observer.OnNext作为converter参数传入。这个lambda将返回一个新的MyDelegate实例,该实例包装了我们提供的转换函数((id, price, amount) => ...)。这是随后传递给handler => MyEvent += handler方法的内容。

之后,每次事件被触发时,它将调用我们的lambda方法并将传递的参数转换为MyArgs的实例,然后将其传递给converter/observer.OnNext

此外,除了所有这些神奇的功能之外,它还会在您使用它时清理事件处理程序,优雅地将异常传递到下游,并通过在多个观察者之间共享单个事件处理程序来管理内存。

源代码

相关内容

  • 没有找到相关文章

最新更新