Rx根据标准进行滤波,并且在标准之后的n个值



仅按标准筛选

changes.Where(p => Evaluate(p)).Subscribe(p => { // Do something });

但是,如何在满足标准后获得标准值和n个值(并且这n个值不必与评估标准匹配)

  • 例如,我想订阅在Evaluate(p)上返回的流以及之后的一个值(然后再次开始评估p

请查看IEnumerable的SkipWhile和take扩展方法。您可以尝试以下代码:

changes.SkipWhile(change => Evaluate(change) == false).Take(n).Subscribe(change => { /* do something */ });

编辑

从带有n项目尾部的序列中获取所有匹配项目的新代码(不重复检索项目)

// Let's assume elements in the sequence are of type Change
int i = 0;
Func<Change, bool> evaluateWithTail = change =>
{
    if (i <= 0 || i > n)
    {
        i = Evaluate(change) ? 1 : 0;
    }
    else
    {
        i++;
    }
    return i > 0;
}
// Please note delegate is specified as parameter directly - without lambda expression
changes.Where(evaluateWithTail).Subscribe(change => { /* do something */ });

下面是另一个稍短的实现:

var filtered = source
  .SkipWhile( x => !Criteria(x) )
  .Take(3)
  .Repeat()

我不认为您可以通过组合现有运算符来创建Rx运算符,因为本质上您想要的是使用Where运算符,但在它匹配后,您想要为接下来的N个元素"关闭它"。好的,显然你可以使用Repeat运算符,这证明了Rx是多么可组合

无论如何,您也可以使用创建自己的Rx运营商的最佳实践创建一个新的运营商:

static class Extensions {
  public static IObservable<T> WhereThenTake<T>(
    this IObservable<T> source,
    Predicate<T> predicate,
    Int32 count
  ) {
    if (source == null)
      throw new ArgumentNullException("source");
    if (predicate == null)
      throw new ArgumentNullException("predicate");
    if (count < 0)
      throw new ArgumentException("count");
    return Observable.Create<T>(
      observer => {
        var finished = false;
        var n = 0;
        var disposable = source.Subscribe(
          x => {
            if (!finished) {
              if (n > 0) {
                observer.OnNext(x);
                n -= 1;
              }
              else if (predicate(x)) {
                n = count;
                observer.OnNext(x);
              }
            }
          },
          ex => { finished = true; observer.OnError(ex); },
          () => { finished = true; observer.OnCompleted(); }
        );
        return disposable;
      }
    );
  }
}

然后这样使用它(Evaluate是谓词,n是谓词匹配后要通过的项数):

changes.WhereThenTake(Evaluate, n).Subscribe( ... );

相关内容

  • 没有找到相关文章

最新更新