仅按标准筛选
changes.Where(p => Evaluate(p)).Subscribe(p => { // Do something });
但是,如何在满足标准后获得标准值和n个值(并且这n个值不必与评估标准匹配)
- 例如,我想订阅在
Evaluate(p)
上返回的流以及之后的一个值(然后再次开始评估p
)
请查看IEnumerable的SkipWhile和take扩展方法。您可以尝试以下代码:
changes.SkipWhile(change => Evaluate(change) == false).Take(n).Subscribe(change => { /* do something */ });
编辑
从带有n
项目尾部的序列中获取所有匹配项目的新代码(不重复检索项目)
// Let's assume elements in the sequence are of type Change
int i = 0;
Func<Change, bool> evaluateWithTail = change =>
{
if (i <= 0 || i > n)
{
i = Evaluate(change) ? 1 : 0;
}
else
{
i++;
}
return i > 0;
}
// Please note delegate is specified as parameter directly - without lambda expression
changes.Where(evaluateWithTail).Subscribe(change => { /* do something */ });
下面是另一个稍短的实现:
var filtered = source
.SkipWhile( x => !Criteria(x) )
.Take(3)
.Repeat()
我不认为您可以通过组合现有运算符来创建Rx运算符,因为本质上您想要的是使用Where
运算符,但在它匹配后,您想要为接下来的N个元素"关闭它"。好的,显然你可以使用Repeat
运算符,这证明了Rx是多么可组合
无论如何,您也可以使用创建自己的Rx运营商的最佳实践创建一个新的运营商:
static class Extensions {
public static IObservable<T> WhereThenTake<T>(
this IObservable<T> source,
Predicate<T> predicate,
Int32 count
) {
if (source == null)
throw new ArgumentNullException("source");
if (predicate == null)
throw new ArgumentNullException("predicate");
if (count < 0)
throw new ArgumentException("count");
return Observable.Create<T>(
observer => {
var finished = false;
var n = 0;
var disposable = source.Subscribe(
x => {
if (!finished) {
if (n > 0) {
observer.OnNext(x);
n -= 1;
}
else if (predicate(x)) {
n = count;
observer.OnNext(x);
}
}
},
ex => { finished = true; observer.OnError(ex); },
() => { finished = true; observer.OnCompleted(); }
);
return disposable;
}
);
}
}
然后这样使用它(Evaluate
是谓词,n
是谓词匹配后要通过的项数):
changes.WhereThenTake(Evaluate, n).Subscribe( ... );