我正在编写一个函数IsAlive
,以获取IObservable<T>
和时间跨度,并返回IObservable<bool>
。典型的用例是检测流服务器是否仍在发送数据。
我已经为它想出了以下解决方案,但我觉得它的工作原理还不是最清楚。
public static IObservable<bool> IsAlive<T>(this IObservable<T> source,
TimeSpan timeout,
IScheduler sched)
{
return source.Window(timeout, sched)
.Select(wind => wind.Any())
.SelectMany(a => a)
.DistinctUntilChanged();
}
有人有更好的方法吗?
FYI-以下是我尝试过的单元测试和现有方法:https://gist.github.com/997003
这应该有效:
public static IObservable<bool> IsAlive<T>(this IObservable<T> source,
TimeSpan timeout,
IScheduler sched)
{
return source.Buffer(timeout, 1, sched)
.Select(l => l.Any())
.DistinctUntilChanged();
}
这种方法也具有语义意义。每次一个项进入时,它都会填充缓冲区,然后传递true。每次超时,都会创建一个空缓冲区,并传递false。
编辑:
这就是为什么buffer-1方法比开窗更好的原因:
var sched = new TestScheduler();
var subj = new Subject<Unit>();
var timeout = TimeSpan.FromTicks(10);
subj
.Buffer(timeout, 1, sched)
.Select(Enumerable.Any)
.Subscribe(x => Console.WriteLine("Buffer(timeout, 1): " + x));
subj
.Window(timeout, sched)
.Select(wind => wind.Any())
.SelectMany(a => a)
.Subscribe(x => Console.WriteLine("Window(timeout): "+x));
sched.AdvanceTo(5);
subj.OnNext(Unit.Default);
sched.AdvanceTo(16);
收益率:
Buffer(timeout, 1): True
Window(timeout): True
Buffer(timeout, 1): False
具体来说,窗口在整个超时期间都是打开的,不会在项目进入时立即关闭和重置。这就是缓冲区限制1的作用所在。一旦项目进入,缓冲区及其计时器就会重新启动。
我可以将缓冲区重新实现为窗口,因为缓冲区的实现是窗口,但a)我认为缓冲区具有更好的语义,b)我不必SelectMany。Scott的Select和SelectMany可以组合成一个SelectMany(x=>x.Any()),但我可以避免整个lambda,并指定Enumerable.Any方法组,它无论如何都会绑定得更快(琐碎)。
怎么样:
source.Select(_ => true)
.Timeout(timeout, sched)
.DistinctUntilChanged()
.Catch<bool, TimeoutException>)(ex => Observable.Return(false));