组合具有值之间最大时间间隔的流



(在反应扩展(Rx)中使用Zip运算符)

将流对组合为一个而不超时

        var xyZipped = xStream.Zip(yStream, (x, y) =>
        {
            Debug.WriteLine("Latest Pair Has Arrived");
            return new List<SomeType> { x, y };
        });
  • 但是,如何在每个流中的两个值之间引入允许的最大时间间隔,以便如果超过值间间隔,则xyZipped 不会输出任何值

  • 如果两个值之间经过太长时间,则也应重置配对,即对于超时后发生的另一个配对,应在每个流中产生一个新值(而不仅仅是一个)。

  • 还是使用不同的运算符/实现来实现这种流逻辑更好

您可以只使用Rx组合子。既然您的主要目标是Zip,那么让我们从Zip开始,然后应用您的到期条件。

public static IObservable<TOut> ZipWithExpiry<TLeft, TRight, TOut>(
                    IObservable<TLeft> left, 
                    IObservable<TRight> right, 
                    Func<TLeft, TRight, TOut> selector, 
                    TimeSpan validity)
        {
            return Observable.Zip(left.Timestamp(), right.Timestamp(), (l, r) => Tuple.Create(l, r))
                             .Where(tuple => Math.Abs((tuple.Item1.Timestamp - tuple.Item2.Timestamp).TotalSeconds) < validity.TotalSeconds)
                             .Select(tuple => selector(tuple.Item1.Value, tuple.Item2.Value));
        }

如果要检查流中的相邻值,可以使用TimeInterval运算符而不是Timestamp对其进行重写。

相关内容

  • 没有找到相关文章

最新更新