我正在学习Rx并学习一些语义。作为一个实验,我正在构建一个可观察的计时器,它在第十次计时时调用OnError。到目前为止,我有两种方法,我认为表现出相同的行为:
var timer = Observable.Interval(TimeSpan.FromMilliseconds(200));
// method 1
Observable.Create<long>(
x => timer.Subscribe(tick => {
if (tick == 10)
{
x.OnError(new Exception());
}
x.OnNext(tick);
}));
// method 2
timer.Select(x => {
if (x == 10)
{
throw new Exception();
}
return x;
});
我假设这两种方法的行为完全相同,这是正确的吗?如果没有,有什么区别?
第二种方法是而不是等价的,抛出选择器会导致看起来相同的未定义行为™.以下是其他几种方法:
Observable.Interval(TimeSpan.FromMilliseconds(200))
.Take(9)
.Concat(Observable.Throw<long>(new Exception("Die!")));
Observable.Interval(TimeSpan.FromMilliseconds(200))
.SelectMany(x => {
if (x < 10) return Observable.Return(x);
return Observable.Throw<long>(new Exception("Die!"));
});