具有计数和时间条件的缓冲区运算符



我有一个非常健谈的序列,我试图通过批量处理事件来提高它的效率。具有时间和计数条件的缓冲区运算符似乎符合我的要求,除了一个小的细微差别。使用此重载时,订阅会在指定的时间延迟后收到通知,无论缓冲区中是否有任何项。这真的很烦人,因为大多数情况下我的订阅从缓冲区运算符那里得到一个空列表。考虑到它是一个多线程应用程序,其中订阅者在 UI 线程上,事实证明这不是批量处理项目的最佳方法。我想知道是否有办法使用可用的运算符创建一个序列,该序列将在缓冲区中存在一定数量的项目或经过一定时间时触发,但当且仅当缓冲区中有任何项目时。我知道我可以做这样的事情:

sequence.Buffer(TimeSpan.FromSeconds(5), 1).Where(e=>e.Count > 0)

但我想知道是否有另一种方法可以做到这一点,因为不知何故我觉得这不是最好的方法。

我看不出有什么理由担心这个 - 你有一个惯用的解决方案。空缓冲区是信息,因此框架实现返回它是合理的。无论如何,任何其他方法都可以有效地在内部做同样的事情。

当我发现自己使用一小群标准运算符时,我经常将它们包装在更具解释性的扩展方法中。 例如:

public static class ObservableExtensions
{
    public static IObservable<IList<T>> ToNonEmptyBuffers<T>(
        this IObservable<T> source,
        TimeSpan timespan,
        int count,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        return source.Buffer(timespan, count, scheduler ?? Scheduler.Default)
                     .Where(buffer => buffer.Count > 0);
    }
}

允许:

sequence.ToNonEmptyBuffers(TimeSpan.FromSeconds(5), 1);

为了"Rx-i-ness",我将以下内容放入堆中。

就我个人而言,我认为詹姆斯的回答是足够的(在很多情况下可能更好)。唯一的区别(就输出而言)是缓冲区计时器仅在生成新项时启动。这就是为什么我们不需要过滤掉空缓冲区的原因。话虽如此,这可能不是最有效的解决方案。它只是在这里展示构图的力量。

var batches = source
        .GroupByUntil(
            // This means we're not really grouping, but windowing.
            // granted, if we needed to group our batches, this is useful!
            x => 0,
            group => Observable.Amb(
                // this means we get a max of 11 per batch
                group.Skip(10),
                // This means we get a max batch time of 10 seconds
                group.Take(1).Delay(TimeSpan.FromSeconds(10))
        ))
        // Since GroupByUntil gives us windows, we can ToArray them.
        .SelectMany(x => x.ToArray());

相关内容

  • 没有找到相关文章

最新更新