在IObservable上检测IsAlive



我正在编写一个函数IsAlive,以获取IObservable<T>和时间跨度,并返回IObservable<bool>。典型的用例是检测流服务器是否仍在发送数据。

我已经为它想出了以下解决方案,但我觉得它的工作原理还不是最清楚。

public static IObservable<bool> IsAlive<T>(this IObservable<T> source, 
                                           TimeSpan timeout, 
                                           IScheduler sched)
{
    return source.Window(timeout, sched)
                 .Select(wind => wind.Any())
                 .SelectMany(a => a)
                 .DistinctUntilChanged();
}

有人有更好的方法吗?

FYI-以下是我尝试过的单元测试和现有方法:https://gist.github.com/997003

这应该有效:

public static IObservable<bool> IsAlive<T>(this IObservable<T> source, 
                                           TimeSpan timeout, 
                                           IScheduler sched)
{
    return source.Buffer(timeout, 1, sched)
                 .Select(l => l.Any())
                 .DistinctUntilChanged();
}

这种方法也具有语义意义。每次一个项进入时,它都会填充缓冲区,然后传递true。每次超时,都会创建一个空缓冲区,并传递false。

编辑:

这就是为什么buffer-1方法比开窗更好的原因:

var sched = new TestScheduler();
var subj = new Subject<Unit>();
var timeout = TimeSpan.FromTicks(10);
subj
    .Buffer(timeout, 1, sched)
    .Select(Enumerable.Any)
    .Subscribe(x => Console.WriteLine("Buffer(timeout, 1): " + x));
subj
    .Window(timeout, sched)
    .Select(wind => wind.Any())
    .SelectMany(a => a)
    .Subscribe(x => Console.WriteLine("Window(timeout): "+x));
sched.AdvanceTo(5);
subj.OnNext(Unit.Default);
sched.AdvanceTo(16);

收益率:

Buffer(timeout, 1): True
Window(timeout): True
Buffer(timeout, 1): False

具体来说,窗口在整个超时期间都是打开的,不会在项目进入时立即关闭和重置。这就是缓冲区限制1的作用所在。一旦项目进入,缓冲区及其计时器就会重新启动。

我可以将缓冲区重新实现为窗口,因为缓冲区的实现窗口,但a)我认为缓冲区具有更好的语义,b)我不必SelectMany。Scott的Select和SelectMany可以组合成一个SelectMany(x=>x.Any()),但我可以避免整个lambda,并指定Enumerable.Any方法组,它无论如何都会绑定得更快(琐碎)。

怎么样:

source.Select(_ => true)
    .Timeout(timeout, sched)
    .DistinctUntilChanged()
    .Catch<bool, TimeoutException>)(ex => Observable.Return(false));

相关内容

  • 没有找到相关文章

最新更新