计算TimeSeries的滑动窗口上的最大值



输入:

public class MyObject
{
public double Value { get; set; }
public DateTime Date { get; set; }
}

生成测试对象的方法:

public static MyObject[] GetTestObjects()
{
var rnd = new Random();
var date = new DateTime(2021, 1, 1, 0, 0, 0);
var result = new List<MyObject>();
for (int i = 0; i < 50000; i++)
{
//this is to simulate real data having gaps
if (rnd.Next(100) < 25)
{
continue;
}
var myObject = new MyObject()
{
Value = rnd.NextDouble(),
Date = date.AddMinutes(15 * i)
};
result.Add(myObject);
}
return result.ToArray();
}

鉴于此,我需要为每个myObject计算前12个月的最大值。我可以考虑在InParallel中这样做,但也许有一个优化的解决方案?

很抱歉不清楚,这是我现在用来得到我想要的东西:

public MyObject[] BruteForceBackward(MyObject[] testData)
{
return testData.AsParallel().Select(point =>
{
var max = testData.Where(x => x.Date <= point.Date && x.Date >= point.Date.AddYears(-1)).Max(x => x.Value);
return new MyObject() { Date = point.Date, Value = point.Value / max };
}).OrderBy(r => r.Date).ToArray();
}

这是可行的,但它很慢,占用处理器资源(想象一下,你有10万个对象(,我相信一定有更好的

我有一个类似的项目,我必须根据大量的传感器数据计算这些东西。

你现在可以在我的Github存储库中找到一个更精细的版本,它应该可以使用了(.Net(:https://github.com/forReason/Statistics-Helper-Library

通常,您希望减少遍历所有数据的循环数量。最多,您希望只触摸一次每个元素。

进程阵列(相当于BruteForceBackwards(

public static MyObject[] FlowThroughForward(ref MyObject[] testData)
{
// generate return array
MyObject[] returnData = new MyObject[testData.Length];
// keep track to minimize processing
double currentMaximum = 0;
List<MyObject> maximumValues = new List<MyObject>();
// go through the elements
for (int i = 0; i < testData.Length; i++)
{
// calculate the oldest date to keep in tracking list
DateTime targetDate = testData[i].Date.AddYears(-1);
// maximum logic
if (testData[i].Value >= currentMaximum)
{
// new maximum found, clear tracking list
// this is the best case scenario
maximumValues.Clear();
currentMaximum = testData[i].Value;
}
else
{
// unfortunately, no new maximum was found
// go backwards the maximum tracking list and check for smaller values
// clear the list of all smaller values. The list should therefore always
// be in descending order
for (int b = maximumValues.Count - 1; b >= 0; b--)
{
if (maximumValues[b].Value <= testData[i].Value)
{
// a lower value has been found. We have a newer, higher value
// clear this waste value from the tracking list
maximumValues.RemoveAt(b);
}
else
{
// there are no more lower values. 
// stop looking for smaller values to save time
break;
}
}
}
// append new value to tracking list, no matter if higher or lower
// all future values might be lower
maximumValues.Add(testData[i]);
// check if the oldest value is too old to be kept in the tracking list
while (maximumValues[0].Date < targetDate)
{
// oldest value is to be removed
maximumValues.RemoveAt(0);
// update maximum
currentMaximum = maximumValues[0].Value;
}
// add object to result list
returnData[i] = new MyObject() { Date = testData[i].Date, Value = testData[i].Value / currentMaximum }; ;
}
return returnData;
}

实时数据或流式数据

注意:如果您有非常大的列表,那么在传递完整数组的方法中可能会遇到内存问题。在这种情况下:一次传递一个值,从最旧的值传递到最新的值。一次存储一个值。此函数也可用于实时数据
测试方法包含在代码中。

static void Main(string[] args)
{
int length = 50000;

Stopwatch stopWatch1 = new Stopwatch();
stopWatch1.Start();
var myObject = new MyObject();
var result = new List<MyObject>();
var date = new DateTime(2021, 1, 1, 0, 0, 0);
for (int i = 0; i < length; i++)
{
//this is to simulate real data having gaps
if (rnd.Next(100) < 25)
{
continue;
}
myObject.Value = rnd.NextDouble();
myObject.Date = date.AddMinutes(15 * i);
result.Add(CalculateNextObject(ref myObject));
}
stopWatch1.Stop();
Console.WriteLine("test code executed in " + stopWatch1.ElapsedMilliseconds + " ms");
Thread.Sleep(1000000);
}
private static Random rnd = new Random();
private static double currentMaximum = 0;
private static List<MyObject> maximumValues = new List<MyObject>();
public static MyObject CalculateNextObject(ref MyObject input)
{
// calculate the oldest date to keep in tracking list
DateTime targetDate = input.Date.AddYears(-1);
// maximum logic
if (input.Value >= currentMaximum)
{
// new maximum found, clear tracking list
// this is the best case scenario
maximumValues.Clear();
currentMaximum = input.Value;
}
else
{
// unfortunately, no new maximum was found
// go backwards the maximum tracking list and check for smaller values
// clear the list of all smaller values. The list should therefore always
// be in descending order
for (int b = maximumValues.Count - 1; b >= 0; b--)
{
if (maximumValues[b].Value <= input.Value)
{
// a lower value has been found. We have a newer, higher value
// clear this waste value from the tracking list
maximumValues.RemoveAt(b);
}
else
{
// there are no more lower values. 
// stop looking for smaller values to save time
break;
}
}
}
// append new value to tracking list, no matter if higher or lower
// all future values might be lower
maximumValues.Add(input);
// check if the oldest value is too old to be kept in the tracking list
while (maximumValues[0].Date < targetDate)
{
// oldest value is to be removed
maximumValues.RemoveAt(0);
// update maximum
currentMaximum = maximumValues[0].Value;
}
// add object to result list
MyObject returnData = new MyObject() { Date = input.Date, Value = input.Value / currentMaximum };
return returnData;
}

试验方法

static void Main(string[] args)
{
MyObject[] testData = GetTestObjects();
Stopwatch stopWatch1 = new Stopwatch();
Stopwatch stopWatch2 = new Stopwatch();
stopWatch1.Start();
MyObject[] testresults1 = BruteForceBackward(testData);
stopWatch1.Stop();
Console.WriteLine("BruteForceBackward executed in " + stopWatch1.ElapsedMilliseconds + " ms");
stopWatch2.Start();
MyObject[] testresults2 = FlowThroughForward(ref testData);
stopWatch2.Stop();
Console.WriteLine("FlowThroughForward executed in " + stopWatch2.ElapsedMilliseconds + " ms");
Console.WriteLine();
Console.WriteLine("Comparing some random test results: ");
var rnd = new Random();
for (int i = 0; i < 10; i++)
{
int index = rnd.Next(0, testData.Length);
Console.WriteLine("Index: " + index + " brute: " + testresults1[index].Value + " flow: " + testresults2[index].Value);
}
Thread.Sleep(1000000);
}

测试结果

测试是在一台有32个核心的机器上进行的,所以在任何多线程的方法中都应该是有利的,但你会看到;(

99.9%
函数时间Time%
BruteForceBackward5334ms
FlowThroughForward5毫秒0.094%

一个有趣且富有挑战性的问题。我使用动态编程方法(第一次学习是在'78年的CS算法课上(制定了一个解决方案。首先,构造一个树,该树包含递归定义的范围上预先计算的局部最大值。一旦构造,可以主要使用预先计算的值来有效地计算任意范围的最大值。只有在该范围的边缘处,计算才会下降到元素级别。

它没有julian bechtold的FlowThroughForward方法那么快,但随机访问范围可能是一个优势。

要添加到Main的代码:

Console.WriteLine();
Stopwatch stopWatch3 = new Stopwatch();
stopWatch3.Start();
MyObject[] testresults3 = RangeTreeCalculation(ref testData, 10);
stopWatch3.Stop();
Console.WriteLine($"RangeTreeCalculation executed in {stopWatch3.ElapsedMilliseconds} ms");
... test comparison
Console.WriteLine($"Index: {index} brute: {testresults1[index].Value} flow: {testresults2[index].Value} rangeTree: {testresults3[index].Value}");

测试功能:

public static MyObject[] RangeTreeCalculation(ref MyObject[] testDataArray, int partitionThreshold)
{
// For this implementation, we need to convert the Array to an ArrayList, because we need a
// reference type object that can be shared.
List<MyObject> testDataList = testDataArray.ToList();
// Construct a tree containing recursive collections of pre-calculated values
var rangeTree = new RangeTree(testDataList, partitionThreshold);
MyObject[] result = new MyObject[testDataList.Count];
Parallel.ForEach(testDataList, (item, state, i) =>
{
var max = rangeTree.MaxForDateRange(item.Date.AddYears(-1), item.Date);
result[i] = new MyObject() { Date = item.Date, Value = item.Value / max };
});
return result;
}

支持类别:

// Class used to divide and conquer using dynamic programming.
public class RangeTree
{
public List<MyObject> Data; // This reference is shared by all members of the tree
public int Start { get; } // Index of first element covered by this node.
public int Count { get; } // Number of elements covered by this node.
public DateTime FirstDateTime { get; }
public DateTime LastDateTime { get; }
public double MaxValue { get; }  // Pre-calculated max for all elements covered by this node.
List<RangeTree> ChildRanges { get; }
// Top level node constructor
public RangeTree(List<MyObject> data, int partitionThreshold)
: this(data, 0, data.Count, partitionThreshold)
{
}

// Child node constructor, which covers an recursively decreasing range of element.
public RangeTree(List<MyObject> data, int start, int count, int partitionThreshold)
{
Data = data;
Start = start;
Count = count;
FirstDateTime = Data[Start].Date;
LastDateTime = Data[Start + Count - 1].Date;
if (count <= partitionThreshold)
{
// If the range is smaller than the threshold, just calculate the local max
// directly from the items. No child ranges are defined.
MaxValue = Enumerable.Range(Start, Count).Select(i => Data[i].Value).Max();
}
else
{
// We still have a significant range. Decide how to further divide them up into sub-ranges.
// (There may be room for improvement here to better balance the tree.)
int partitionSize = (count - 1) / partitionThreshold + 1;
int partitionCount = (count - 1) / partitionSize + 1;
if (count < partitionThreshold * partitionThreshold)
{
// When one away from leaf nodes, prefer fewer full leaf nodes over more
// less populated leaf nodes.
partitionCount = (count - 1) / partitionThreshold + 1;
partitionSize = (count - 1) / partitionCount + 1;
}
ChildRanges = Enumerable.Range(0, partitionCount)
.Select(partitionNum => new {
ChildStart = Start + partitionNum * partitionSize,
ChildCount = Math.Min(partitionSize, Count - partitionNum * partitionSize)
})
.Where(part => part.ChildCount > 0) // Defensive
.Select(part => new RangeTree(Data, part.ChildStart, part.ChildCount, partitionThreshold))
.ToList();
// Now is the dynamic programming part:
// Calculate the local max as the max of all child max values.
MaxValue = ChildRanges.Max(chile => chile.MaxValue);
}
}
// Get the max value for a given range of dates withing this rangeTree node.
// This used the precalculated values as much as possible.
// Only at the fringes of the date range to we calculate at the element level.
public double MaxForDateRange(DateTime fromDate, DateTime thruDate)
{
double calculatedMax = Double.MinValue;
if (fromDate > this.LastDateTime || thruDate < this.FirstDateTime)
{
// Entire range is excluded. Nothing of interest here folks.
calculatedMax = Double.MinValue;
}
else if (fromDate <= this.FirstDateTime && thruDate >= this.LastDateTime)
{
// Entire range is included. Use the already-calculated max.
calculatedMax = this.MaxValue;
}
else if (ChildRanges != null)
{
// We have child ranges. Recurse and accumulate.
// Possible optimization: Calculate max for middle ranges first, and only bother
// with extreme partial ranges if their local max values exceed the preliminary result.
for (int i = 0; i < ChildRanges.Count; ++i)
{
double childMax = ChildRanges[i].MaxForDateRange(fromDate, thruDate);
if (childMax > calculatedMax)
{
calculatedMax = childMax;
}
}
}
else
{
// Leaf range. Loop through just this limited range of notes, checking individually for
// date in range and accumulating the result.
for (int i = 0; i < this.Count; ++i)
{
var element = Data[this.Start + i];
if (fromDate <= element.Date && element.Date <= thruDate && element.Value > calculatedMax)
{
calculatedMax = element.Value;
}
}
}
return calculatedMax;
}
}

还有很大的改进空间,比如参数化类型和泛化功能,以支持的不仅仅是Max(Value(,但框架是存在的。

假设您的意思是在result之后的最后12个月内,每个月都需要最大的Value,那么您可以使用LINQ:

var beginDateTime = DateTime.Now.AddMonths(-12);
var ans = result.Where(r => r.Date >= beginDateTime).GroupBy(r => r.Date.Month).Select(mg => mg.MaxBy(r => r.Value)).ToList();

运行一些计时,我发现将AsParallel放在result之后会将运行时间从16ms左右(第一次运行(更改为32ms左右,所以它实际上更慢。在Where之后大约相同,在GroupBy之后大约23ms(并行处理12个组(。至少在我的电脑上,没有足够的数据或复杂的并行操作,但GroupBy并不是最高效的。

使用一个数组并测试每个元素,我在大约1.2ms内得到结果:

var maxMOs = new MyObject[12];
foreach (var r in result.Where(r => r.Date >= beginDateTime)) {
var monthIndex = r.Date.Month-1;
if (maxMOs[monthIndex] == null || r.Value > maxMOs[monthIndex].Value)
maxMOs[monthIndex] = r;
}

请注意,结果不是按时间顺序排列的;如果需要,您可以在今天的月份抵消monthIndex以订购结果。

var maxMOs = new MyObject[12];
var offset = DateTime.Now.Month-11;
foreach (var r in result.Where(r => r.Date >= beginDateTime)) {
var monthIndex = r.Date.Month-offset;
if (maxMOs[monthIndex] == null || r.Value > maxMOs[monthIndex].Value)
maxMOs[monthIndex] = r;
}

微观优化(在重复运行时最有用(是反转测试并使用空传播运算符:

if (!(r.Value <= maxMOs[monthIndex]?.Value))

这在第一次跑步时节省了约0.2ms,但在随后的跑步中最多可节省0.5ms。

这里有一个类似于julian bechtold答案的解决方案。不同的是,最大值(以及所有相关变量(被隐藏在一个单独的类中,该类的目的只是跟踪过去一年的最大值。算法是一样的,我只是在这里和那里使用了一些Linq表达式。

我们跟踪以下类别中的最大值:

public class MaxSlidingWindow
{
private readonly List<MyObject> _maximumValues;
private double _max;
public MaxSlidingWindow()
{
_maximumValues = new List<MyObject>();
_max = double.NegativeInfinity;
}
public double Max => _max;

public void Add(MyObject myObject)
{
if (myObject.Value >= _max)
{
_maximumValues.Clear();
_max = myObject.Value;
}
else
{
RemoveValuesSmallerThan(myObject.Value);
}
_maximumValues.Add(myObject);
RemoveObservationsBefore(myObject.Date.AddYears(-1));
_max = _maximumValues[0].Value;
}
private void RemoveObservationsBefore(DateTime targetDate)
{
var toRemoveFromFront = 0;
while (_maximumValues[toRemoveFromFront].Date < targetDate && toRemoveFromFront <= maximumValues3.Count -1)
{
toRemoveFromFront++;
}
_maximumValues.RemoveRange(0, toRemoveFromFront);
}
private void RemoveValuesSmallerThan(double targetValue)
{
var maxEntry = _maximumValues.Count - 1;
var toRemoveFromBack = 0;
while (toRemoveFromBack <= maxEntry && _maximumValues[maxEntry - toRemoveFromBack].Value <= targetValue)
{
toRemoveFromBack++;
}
_maximumValues.RemoveRange(maxEntry - toRemoveFromBack + 1, toRemoveFromBack);
}
}

它可以如下使用:

public static MyObject[] GetTestObjects_MaxSlidingWindow()
{
var rnd = new Random();
var date = new DateTime(2021, 1, 1, 0, 0, 0);
var result = new List<MyObject>();
var maxSlidingWindow = new MaxSlidingWindow();
for (int i = 0; i < 50000; i++)
{
//this is to simulate real data having gaps
if (rnd.Next(100) < 25)
{
continue;
}
var myObject = new MyObject()
{
Value = rnd.NextDouble(),
Date = date.AddMinutes(15 * i)
};

maxSlidingWindow.Add(myObject);
var max = maxSlidingWindow.Max;
result.Add(new MyObject { Date = myObject.Date, Value = myObject.Value / max });
}
return result.ToArray();
}

请参阅下面的相对计时-上面的解决方案稍快(计时超过1000万次(,但几乎不明显:

相对定时

最新更新