可观察 Rx 中的回调



我正在寻找一种优雅的方式来使用 Rx 从普通回调委托创建Observable,类似于 Observable.FromEventPattern

比如说,我正在包装 Win32 EnumWindows API,它会回调我提供的EnumWindowsProc

我知道我可以为此回调创建一个临时 C# 事件适配器并将其传递给FromEventPattern。此外,我可能可以手动实现IObservable,因此它会从我的EnumWindowsProc回调中调用IObserver.OnNext

是否存在我缺少的用于在 Rx 中包装回调的现有模式?

您可以使用可用于从命令式编程世界移动到Rx功能世界的Subject<T>

Subject<T>同时实现了IObservable<T>IObserver<T>,因此您可以调用其OnNextOnErrorOnCompleted方法,订阅者将收到通知。

如果要将Subject<T>公开为属性,则应使用.AsObservable(),因为这隐藏了IObservable<T>实际上是Subject<T>的事实。它使诸如((Subject<string>) obj.Event).OnNext("Foo")之类的事情变得不可能。

请记住

,像EnumWindows中使用的回调与Rx有细微的不同。 具体来说,回调可以通过其返回值与调用方通信。 Rx 观察者无法执行此操作。 此外,回调可以接收多个参数,但 Rx 观察器接收单个值。 因此,您需要将多个参数包装到单个对象中。

考虑到这一点,使用Subject的替代方法是使用 Observable.Create . 这样,您仅在实际存在观察者时才注册回调,如果该观察者取消订阅,则取消注册。

对于您使用示例的同步 API,您可以执行以下操作。 请注意,在此示例中,实际上没有一种方法可以在中途注销回调,因为在我们可以返回取消订阅一次性之前,这一切都是同步发生的。

public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
    return Observable.Create<Foo>(observer =>
    {
        FooApi.enumerate(arg1, arg2, e =>
        {
            observer.OnNext(new Foo(e));
            return true;
        });
        // In your case, FooApi.enumerate is actually synchronous
        // so when we get to this line of code, we know
        // the stream is complete.
        observer.OnCompleted();
        return Disposable.Empty;
    });
}
// Usage
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item

我们可以通过引入一点异步性来解决无法提前停止的问题,这将使观察者有时间获得可以处理的一次性产品以通知您。 我们可以使用CreateAsync来获取一个CancellationToken,该将在观察者取消订阅时取消。 我们可以在 Task.Run 中运行 FooApi 代码:

public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
    return Observable.CreateAsync<Foo>(async (observer, ct) =>
    {
        await Task.Run(() => FooApi.register_callback(arg1, arg2, e =>
        {
            observer.OnNext(e);
            // Returning false will stop the enumeration
            return !ct.IsCancellationRequested;
        }));
        observer.OnCompleted();
    });
}

在更传统的异步回调 API 中,您在某个时间点注册并在其他时间点取消注册,您可能会有更像这样的东西:

public static IObservable<Foo> WrapFooApi(string args)
{
    return Observable.Create<Foo>(observer =>
    {
        FooToken token = default(FooToken);
        var unsubscribe = Disposable.Create(() => FooApi.Unregister(token));
        token = FooApi.Register(args, e =>
        {
            observer.OnNext(new Foo(e));
        });
        return unsubscribe;
    });
}

最新更新