我有一个IObservable<...>
类型的可观测项,需要从函数返回一个IObservable<Unit>
类型的可观察项。目前我的做法如下:
IObservable<Unit> DoSomething() {
var observable = SomeOtherMethodThatReturnsObservable();
// convert Convert IObservable<...> to IObservable<Unit>
var ret = new AsyncSubject<Unit>();
observable.Subscribe(_ => { }, ret.OnCompleted);
return ret;
}
还有更好的方法吗?也许是扩展方法什么的?
我不会使用您在生产中给出的示例,因为它没有考虑以下内容:
- 如果Observable出错,会发生什么?
- 错误被吃掉了,消费者永远不知道可观察的东西已经结束
- 如果消费者处理了它的订阅,会发生什么?
- 未处置基础订阅
- 如果消费者从未订阅,会发生什么?
- 基本可观测数据仍被认购
通常,使用内置运算符可以提供最佳实现。
这是一种"Rx-y"的方法
source.IgnoreElements()
.Cast<Unit>()
.Concat(Observable.Return(Unit.Default));
这里有另一种做同样事情的方法:)
没有内置的操作员。这可以说是更有效的,但这里并没有真正的收益。
// .Cast<Unit>()
Observable.Create<Unit>(o => { // Also make sure you return the subscription!
return source.Subscribe(
// .IgnoreElements()
_ => { },
// Make sure errors get passed through!!!
o.OnError,
// .Concat(Observable.Return(Unit.Default));
() => {
o.OnNext(Unit.Default);
o.OnCompleted();
});
});
以下是如何只写一次
public static class ObsEx
{
public static IObservable<Unit> WhenDone<T>(this IObservable<T> source)
{
return Observable.Create<Unit>(observer =>
{
return source.Subscribe(
_ => { },
observer.OnError,
() => {
observer.OnNext(Unit.Default);
observer.OnCompleted();
});
});
}
}
并像这样使用:source.WhenDone();