突发数据的反应性积累



问题是:存在一系列数值。值是以突发的方式推送的,因此100个值可以非常接近(时间上(,比如每5-10毫秒,然后可能会停止一段时间,然后可以再次突发。其思想是显示最大长度为500ms的窗口的累积值(总和(。

我的第一次尝试是使用Buffer(500ms(,但这会导致事件(每500ms(不断增加,总和为0(因为累积的缓冲区项目为0(,可以通过空缓冲区进行过滤来解决,但我真的想完全避免这种情况,只有在一段"静默"后实际推送值后才打开缓冲区。

附加限制:实现是UniRx,它不包含所有的Rx运算符,尤其是Window(我怀疑在这种情况下可能有用(,因此解决方案仅限于基本运算符,包括Buffer。

由于您只需要求和,因此使用Buffer是过分的。我们可以运行ScanAggregation

var burstSum =
source
.Scan(0, (acc, current) => acc + current)
.Throttle(TimeSpan.FromMilliseconds(500))
.Take(1)
.Repeat();

这将启动一个累积总和的流,直到该流空闲至少500ms。

但如果我们想至少每一个时间桶都排放,我们就必须走另一条路。我们做了两个假设:

  1. 元素之间的时间间隔之和应等于第一个元素和最后一个元素之间的间隔
  2. 当流完成时,Throttle将释放最后一个值。

    source
    .TimeInterval()
    .Scan((acc, cur) => new TimeInterval<int>(acc.Value + cur.Value, acc.Interval + cur.Interval))
    .TakeWhile(acc => acc.Interval <= TimeSpan.FromMilliseconds(500))
    .Throttle(TimeSpan.FromMilliseconds(500))
    .Select(acc => acc.Value)
    .Take(1)
    .Repeat();
    

相关内容

  • 没有找到相关文章

最新更新