我需要观察一个以常规间隔产生值的流,但是有太多的值,所以我想占用一个值,说每十个值的值一个值。
等效的非RX做到这一点的方法是:
int step = 10; // take every 10th value
var numbers = Enumerable.Range(0, 100).Where((e, i) => i % step == 0);
在RX中做什么是什么惯用方法?
我将使用 Where
或 Buffer
:
int step = 10; // take every 10th value
// Where
var numbers = Observable.Range(0, 100).Where((e, i) => i % step == 0);
// Buffer
numbers = Observable.Range(0, 100).Buffer(step).Select(x => x[step - 1]);
我会避免使用Where
参数的CC_3操作员,因为担心可能的OverflowException
。如果源序列产生超过2,147,483,647个元素,则可能发生这种情况。LINQ和RX操作员都允许算术溢出发生,我不知道有任何阻止这种情况发生的方法。
这是一个自定义TakeEvery
操作员,在这方面很安全:
/// <summary>Samples the observable sequence by returning one every N elements,
/// ignoring the other elements.</summary>
public static IObservable<T> TakeEvery<T>(this IObservable<T> source, int step)
{
if (step < 1) throw new ArgumentOutOfRangeException(nameof(step));
return Observable.Defer(() =>
{
int index = -1;
return source.Where(value => (index = ++index % step) == 0);
});
}