如何在超时扩展中获取最新的序列元素



我想知道在超时触发之前获得序列中最新元素的最佳方法是什么?

我有一个代码,可以不时地ping远程服务,我希望能够识别出已经离线的服务。

使用超时扩展我来到这个:

heartbeatResponseObservable.Timeout(Timeout, Observable.Return(new HeartbeatBusMessage.Timeout()))

这有点工作,但它不允许我找到哪个服务已经消失。我想要的是Timeout扩展,将流中的最新消息作为参数,以在它产生的错误消息中提供一些信息。

我如何获得超时扩展内的最新序列元素?

public static IObservable<T> TimeOutExtension<T>(
    this IObservable<T> source, 
    TimeSpan timeSpan)
{
    // On Timeout complete with an empty Observable.
    var completeOnTimeout = source
                                .Timeout(timeSpan)
                                .Catch<T, TimeoutException>(ex => Observable.Empty<T>());
    // Join the source w/ the empty Observable created on timeout.
    var beforeTimeout =
        source.Join(completeOnTimeout, 
        _ => source, 
        _ => completeOnTimeout, 
        (s, c) => s);
    // Return last
    return beforeTimeout.LastAsync();
}

可以这样使用:

// Create 10 events quickly, then once every two seconds.
var source =
    Observable.Interval(TimeSpan.FromMilliseconds(100))
        .Take(10)
        .Concat(Observable.Interval(TimeSpan.FromSeconds(2)));
// Set a timeout of 1 second.
var last = source.TimeOutExtension(TimeSpan.FromSeconds(1));
last.Subscribe(Console.WriteLine); // outputs 9

相关内容

  • 没有找到相关文章

最新更新