(在反应扩展(Rx)中使用Zip运算符)
将流对组合为一个而不超时
var xyZipped = xStream.Zip(yStream, (x, y) =>
{
Debug.WriteLine("Latest Pair Has Arrived");
return new List<SomeType> { x, y };
});
但是,如何在每个流中的两个值之间引入允许的最大时间间隔,以便如果超过值间间隔,则
xyZipped
不会输出任何值如果两个值之间经过太长时间,则也应重置配对,即对于超时后发生的另一个配对,应在每个流中产生一个新值(而不仅仅是一个)。
还是使用不同的运算符/实现来实现这种流逻辑更好
您可以只使用Rx组合子。既然您的主要目标是Zip
,那么让我们从Zip
开始,然后应用您的到期条件。
public static IObservable<TOut> ZipWithExpiry<TLeft, TRight, TOut>(
IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, TRight, TOut> selector,
TimeSpan validity)
{
return Observable.Zip(left.Timestamp(), right.Timestamp(), (l, r) => Tuple.Create(l, r))
.Where(tuple => Math.Abs((tuple.Item1.Timestamp - tuple.Item2.Timestamp).TotalSeconds) < validity.TotalSeconds)
.Select(tuple => selector(tuple.Item1.Value, tuple.Item2.Value));
}
如果要检查流中的相邻值,可以使用TimeInterval
运算符而不是Timestamp
对其进行重写。