有效平均(移动平均)



我有一个给定(恒定)频率的数据流(整数)。我需要不时地计算不同的平均值(预定义)。我正在寻找快速高效的解决方案。

假设:

  • 采样率是恒定的(预定义的),可能在125-500 SPS之间
  • 我需要计算的平均值是预定义的,它可能是一个或多个平均值(例如,仅最后200ms的平均值或最后250ms和最后500ms)。可能有很多平均值,但它们是预定义的
  • 在任何时候,我都需要能够计算当前的平均值(实时)

我现在拥有的:

  • 我假设在特定的时间段内,总是会有相同数量的数据。因此,在频率为100SPS的情况下,我假设一秒钟正好包含100个值
  • 创建了具有恒定长度的队列(类似于缓冲区)
  • 对于每个定义的平均值,将创建Sum变量
  • 每次新样品到达时,我都会把它放在队列中
  • 每次我在队列中有新样本时,我都会将其值添加到我拥有的每个Sum变量中,并删除窗口外元素的值(基于队列中的位置)
  • 一旦我需要计算平均值,我只需要取特定的Sum变量,并将其除以这个Sum应该包含的元素数量

为了让你更好地了解,我现在有一个代码:

public class Buffer<T> : LinkedList<T>
{
private readonly int capacity;
public bool IsFull => Count >= capacity;
public Buffer(int capacity)
{
this.capacity = capacity;
}
public void Enqueue(T item)
{
if (Count == capacity)
{
RemoveFirst();
}
AddLast(item);
}
}

public class MovingAverage
{
private readonly Buffer<float> Buffer;
private static readonly object bufferLock = new object();
public Dictionary<string, float> Sums { get; private set; }
public Dictionary<string, int> Counts { get; private set; }
public MovingAverage(List<int> sampleCounts, List<string> names)
{
if (sampleCounts.Count != names.Count)
{
throw new ArgumentException("Wrong Moving Averages parameters");
}
Buffer = new Buffer<float>(sampleCounts.Max());
Sums = new Dictionary<string, float>();
Counts = new Dictionary<string, int>();
for (int i = 0; i < names.Count; i++)
{
Sums[names[i]] = 0;
Counts[names[i]] = sampleCounts[i];
}
}

public void ProcessAveraging(float val)
{
lock (bufferLock)
{
if (float.IsNaN(val))
{
val = 0;
}
foreach (var keyVal in Counts.OrderBy(a => a.Value))
{
Sums[keyVal.Key] += val;
if (Buffer.Count >= keyVal.Value)
{
Sums[keyVal.Key] -= Buffer.ElementAt(Buffer.Count - keyVal.Value);
}
}
Buffer.Enqueue(val);
}
}
public float GetLastAverage(string averageName)
{
lock (bufferLock)
{
if (Buffer.Count >= Counts[averageName])
{
return Sums[averageName] / Counts[averageName];
}
else
{
return Sums[averageName] / Buffer.Count;
}
}
}
}

这真的很好,而且足够快,但在现实世界中,有100个SPS并不意味着你总是在1秒内有100个样本。有时是100,有时是99,有时是101。计算这些平均值对我的系统至关重要,一个样本或多或少可能会发生很大变化。这就是为什么我需要一个真正的计时器来告诉我样本是否已经超出移动平均窗口。

将时间戳添加到每个样本的想法似乎很有希望

这里有很多答案。。不妨再加一个:)

这个可能需要一些小的调试;关一";etc-我没有一个真正的数据集可以使用,所以也许可以将其视为伪代码

它和你的一样:有一个环形缓冲区-给它足够的容量来容纳N个样本,其中N足以检查你的移动平均线-100个SPS,想要检查250毫秒,我认为你至少需要25个,但我们的空间并不短缺,所以你可以使它更

struct Cirray
{
long _head;
TimedFloat[] _data;
public Cirray(int capacity)
{
_head = 0;
_data = new TimedFloat[capacity];
}
public void Add(float f)
{
_data[_head++%_data.Length] = new TimedFloat() { F = f };
}
public IEnumerable<float> GetAverages(int[] forDeltas)
{
double sum = 0;
long start = _head - 1;
long now = _data[start].T;
int whichDelta = 0;
for (long idx = start; idx >= 0 && whichDelta < forDeltas.Length; idx--)
{
if (_data[idx % _data.Length].T < now - forDeltas[whichDelta])
{
yield return (float)(sum / (start - idx));
whichDelta++;
}
sum += _data[idx % _data.Length].F;
}
}
}
struct TimedFloat
{
[DllImport("Kernel32.dll", CallingConvention = CallingConvention.Winapi)]
private static extern void GetSystemTimePreciseAsFileTime(out long filetime);

private float _f;
public float F { get => _f;
set {
_f = value;
GetSystemTimePreciseAsFileTime(out long x);
T = DateTime.FromFileTimeUtc(x).Ticks;
}
}
public long T;
}

正常的DateTime.UtcNow不是很精确,大约是16毫秒,所以如果你说即使是一个样本也可以丢弃它,那么对这样的数据进行时间戳可能没有好处。相反,我们可以制作它,这样我们就可以获得相当于高分辨率计时器的刻度,如果您的系统支持它(如果不支持,您可能不得不更改系统,或者滥用StopWatch类来提供更高分辨率的补充),并且我们正在对每个数据项进行时间戳。

我考虑过保持N个不断移动的指针到数据的各个尾端的复杂性,以及dec/递增N个和的复杂性——这仍然可以做到(你清楚地知道如何做到),但你的问题读起来就像你可能很少调用平均值,以至于N个和/计数的解决方案会花更多的时间来维护计数只是时不时地跑250或500个浮子,然后把它们加起来。因此,GetAverages采用一个刻度(每毫秒10000个)的数组来表示您想要的数据范围,例如,以50为步长的50ms到250ms的new[] { 50 * 10000, 100 * 10000, 150 * 10000, 200 * 10000, 250 * 10000 },它从当前头开始并向后求和,直到它将打破时间边界的点(这可能是一位的偏差),然后它产生该时间跨度的平均值,然后在下一个时间跨度内继续求和和和计数(由开始的数学减去当前索引得出的计数)。。我想我正确地理解了你想要的,例如";最后50ms的平均值";以及";最后100ms的平均值";,而不是";最近50ms的平均值";以及";最近的"0"之前50ms的平均值;

编辑:

思考了更多并做到了这一点:

结构Cirray{多头;TimedFloat[]数据;RunningAverage[]_ravgs;

public Cirray(int capacity)
{
_head = 0;
_data = new TimedFloat[capacity];
}
public Cirray(int capacity, int[] deltas) : this(capacity)
{
_ravgs = new RunningAverage[deltas.Length];
for (int i = 0; i < deltas.Length; i++)
_ravgs[i] = new RunningAverage() { OverMilliseconds = deltas[i] };
}
public void Add(float f)
{
//in c# every assignment returns the assigned value; capture it for use later
var addedTF = (_data[_head++ % _data.Length] = new TimedFloat() { F = f });
if (_ravgs == null)
return;
foreach (var ra in _ravgs)
{
//add the new tf to each RA
ra.Count++;
ra.Total += addedTF.F;
//move the end pointer in the RA circularly up the array, subtracting/uncounting as we go
var boundary = addedTF.T - ra.OverMilliseconds; 
while (_data[ra.EndPointer].T < boundary) //while the sample is timed before the boundary, move the
{
ra.Count--; 
ra.Total -= _data[ra.EndPointer].F;
ra.EndPointer = (ra.EndPointer + 1) % _data.Length; //circular indexing
}
}
}
public IEnumerable<float> GetAverages(int[] forDeltas)
{
double sum = 0;
long start = _head - 1;
long now = _data[start].T;
int whichDelta = 0;
for (long idx = start; idx >= 0 && whichDelta < forDeltas.Length; idx--)
{
if (_data[idx % _data.Length].T < now - forDeltas[whichDelta])
{
yield return (float)(sum / (start - idx));
whichDelta++;
}
sum += _data[idx % _data.Length].F;
}
}
public IEnumerable<float> GetAverages() //from the built ins
{
foreach (var ra in _ravgs)
{
if (ra.Count == 0)
yield return 0;
else
yield return (float)(ra.Total / ra.Count);
}
}
}

当然还没有测试过,但它在评论中体现了我的想法

与其使用链表,不如使用一些内部函数作为数组副本。在这个答案中,我为您的缓冲区类提供了一个可能的重写。接管在每个位置保持一个和的想法。

这个缓冲区跟踪所有的总和,但为了做到这一点,它需要用新值对每个项进行总和。根据你需要获得平均值的频率,最好在你需要的时候进行总结,只保留单个值。

无论如何,我只是想指出如何使用Array。复制

public class BufferSum
{
private readonly int _capacity;
private readonly int _last;
private float[] _items;
public int Count { get; private set; }
public bool IsFull => Count >= _capacity;
public BufferSum(int capacity)
{
_capacity = capacity;
_last = capacity - 1;
_items = new float[_capacity];
}
public void Enqueue(float item)
{
if (Count == _capacity)
{
Array.Copy(_items, 1, _items, 0, _last);
_items[_last] = 0;
}
else
{
Count++;
}
for (var i = 0; i < Count; i ++)
{
_items[i] += item;
}
}
public float Avarage => _items[0] / Count;
public float AverageAt(int ms, int fps)
{
var _pos = Convert.ToInt32(ms / 1000 * fps);
return _items[Count - _pos] / _pos; 
}
}

另外,要小心锁定语句,这将花费大量时间。

制作一个大小为500的数组,int计数器c

For every sample:
summ -= A[c % 500]  //remove old value
summ += sample 
A[c % 500] = sample  //replace it with new value
c++
if needed, calculate
average = summ / 500

您总是希望删除序列一侧最旧的元素,并在序列的另一侧添加一个新元素:您需要一个队列而不是堆栈。

我认为圆形列表会更快:只要你没有最大大小,只需添加元素,一旦你有了最大大小,就替换最旧的元素。

这似乎是一个很好的可重用类。稍后我们将添加移动平均线部分。

class RoundArray<T>
{
public RoundArray(int maxSize)
{
this.MaxSize = maxSize;
this.roundArray = new List<T>(maxSize);
}
private readonly int maxSize;
private readonly List<T> roundArray;
public int indexOldestItem = 0;
public void Add(T item)
{
// if list not full, just add
if (this.roundArray.Count < this.maxSize)
this.roundArray.Add(item);
else
{
// list is full, replace the oldest item:
this.roundArray[oldestItem] = item;
oldestItem = (oldestItem + 1) % this.maxSize;
} 
public int Count => this.roundArray.Count;
public T Oldest => this.roundArray[this.indexOldestItem];               
}
}

为了使这个类有用,添加方法来枚举数据,从最旧或最新开始,考虑添加其他有用的可重用方法。也许您应该实现IReadOnlyCollection<T>。也许有些私人领域应该有公共财产。

移动平均计算器将使用此RoundArray。每当添加了一个项目,而您的roundArray尚未满时,该项目就会添加到总和和roundArray中。

如果roundArray已满,则该项将替换最旧的项。您可以从总和中减去OldestItem的值,然后将新Item添加到总和中。

class MovingAverageCalculator
{
public MovingAverageCalculator(int maxSize)
{
this.roundArray = new RoundArray<int>(maxSize);
}
private readonly RoundArray<int> roundArray;
private int sum = 0;
private int Count => this.RoundArray.Count;
private int Average => this.sum / this.Count;
public voidAdd(int value)
{
if (this.Count == this.MaxSize)
{
// replace: remove the oldest value from the sum and add the new one
this.Sum += value - this.RoundArray.Oldest;
}
else
{
// still building: just add the new value to the Sum
this.Sum  += value;
}
this.RoundArray.Add(value);
}
}

累计和

对于大约1000个元素的每个块,计算一系列累积和1。(可能会少一些,但500或1000没有太大区别,这样会更舒适)你想抓住每个方块,只要里面至少有一个元素是相关的。然后可以回收2

当您需要当前金额,并且您在一个区块内时,您需要的金额为:
block[max_index] - block[last_relevant_number]

如果您处于两个区块的边界b1,b2,则您需要的总和为:
b1[b1.length - 1] - b1[last_relevant_number] + b2[max_index]

我们完了。这种方法的主要优点是,您不需要事先知道要保留多少元素,并且可以随时计算结果
您也不需要处理元素的删除,因为当您回收段时,您会自然地覆盖它们-保留索引就是您所需要的。

示例:让我们有一个恒定的时间序列ts = [1,1,1, .... 1]。该系列的累计和将为cumsum = [1,2,3 ... n]。从ts的第i个到第j个(包括第j个)元素的和将是cumsum[j]-cumsum[i-1]=j-i-1。对于i=5,j=6,它将是6-4=2,这是正确的。


1对于数组[1,2,3,4,5],这些将是[1,3,6,10,15]-只是为了完整性
2既然你提到了~500个元素,那么两个块就足够了。

相关内容

最新更新