Rx:聚合 IObservable<...> to IObservable<Unit>



我有一个IObservable<...>类型的可观测项,需要从函数返回一个IObservable<Unit>类型的可观察项。目前我的做法如下:

IObservable<Unit> DoSomething() {
    var observable = SomeOtherMethodThatReturnsObservable();
    // convert Convert IObservable<...> to IObservable<Unit>
    var ret = new AsyncSubject<Unit>();
    observable.Subscribe(_ => { }, ret.OnCompleted);
    return ret;
}

还有更好的方法吗?也许是扩展方法什么的?

我不会使用您在生产中给出的示例,因为它没有考虑以下内容:

  • 如果Observable出错,会发生什么?
    • 错误被吃掉了,消费者永远不知道可观察的东西已经结束
  • 如果消费者处理了它的订阅,会发生什么?
    • 未处置基础订阅
  • 如果消费者从未订阅,会发生什么?
    • 基本可观测数据仍被认购

通常,使用内置运算符可以提供最佳实现。

这是一种"Rx-y"的方法

source.IgnoreElements()
    .Cast<Unit>()
    .Concat(Observable.Return(Unit.Default));

这里有另一种做同样事情的方法:)

没有内置的操作员。这可以说是更有效的,但这里并没有真正的收益。

// .Cast<Unit>()
Observable.Create<Unit>(o => { // Also make sure you return the subscription!
    return source.Subscribe(
        // .IgnoreElements()
        _ => { },
        // Make sure errors get passed through!!!
        o.OnError,
        // .Concat(Observable.Return(Unit.Default));
        () => {
            o.OnNext(Unit.Default);
            o.OnCompleted();
        });
});

以下是如何只写一次

public static class ObsEx
{
    public static IObservable<Unit> WhenDone<T>(this IObservable<T> source)
    {
        return Observable.Create<Unit>(observer =>
        {
            return source.Subscribe(
                _ => { },
                observer.OnError,
                () => {
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                });
        });
    }
}

并像这样使用:source.WhenDone();

相关内容

  • 没有找到相关文章