如何在F#中优化此移动平均值计算



我有一个移动平均线,它具有以下特殊性:

  • 每个条目都有一个时间戳,值在时间上分布不均匀,队列长度可能会有很大差异
  • 我没有固定的周期,所以代码必须是灵活的,因为需要几个周期
  • 所使用的周期是一个时间戳,只有超过该时间戳的记录才能使用

这是代码:

module PriceMovingAverage =
// queue duration
let queueDuration = TimeSpan.FromHours(1.)
// moving average queue
let private timestampQueue = Queue<DateTime>()
let private priceQueue     = Queue<float>()
// update moving average
let updateMovingAverage (tradeData: TradeData) =
// add the new price
timestampQueue.Enqueue(tradeData.Timestamp)
priceQueue.Enqueue(float tradeData.Price)
// remove the items older than the price base period
let rec dequeueLoop () =
if timestampQueue.Peek() + queueDuration < tradeData.Timestamp then
timestampQueue.Dequeue() |> ignore
priceQueue.Dequeue() |> ignore
dequeueLoop()
dequeueLoop()

// get the moving average
let getPrice fromTimestamp =
// count how many records to skip
let recordsToSkip =
timestampQueue
|> Seq.takeWhile (fun t -> t < fromTimestamp)
|> Seq.length
// calculate the average of the prices within the time range
try
Some (
priceQueue
|> Seq.skip recordsToSkip
|> Seq.average
|> decimal
)
with _ ->
None

问题是最后一部分:我正在遍历时间戳队列,以查找需要跳过的记录数量。然后我要查看价格记录来计算平均值。

很多CPU时间都花在了第一部分:

let recordsToSkip =
timestampQueue
|> Seq.takeWhile (fun t -> t < fromTimestamp)
|> Seq.length

通过序列然后计算长度是缓慢的。

理想情况下,我只需要使用一个带有循环缓冲区的数组,但问题是,队列的长度可能会根据数据而发生显著变化,因为索引实际上是时间戳,而不是队列中的位置。

我可以把它变成一个列表,而不是一个序列,也许可以获得一些速度,但这意味着每次都要复制整个列表。我以为有两个队列来做平均会更快,但也许这不是真的。

有人知道如何在保持灵活性的同时快速(称为5-10x/秒(吗?


编辑:

合并两个队列得到的结果是:

let getPrice fromTimestamp =
try
Some (
priceQueue
|> Seq.toList
|> List.skipWhile (fun t -> t.Timestamp < fromTimestamp)
|> List.averageBy (fun t -> t.Price)
|> decimal
)
with _ ->
None

它更快了,但仍然非常慢。


编辑:

  • 我在这里做了一个Jupyter笔记本:https://pastebin.com/E3uS6j7T
  • 如果你愿意,我也直接在这里粘贴了测试代码:https://pastebin.com/fK18Wyui

一方面,这就像Asti在回答中所说的那样。另一方面,如果您绝对必须将数据保存在两个独立的队列中,则可以使用Seq.zip:一次性枚举它们

timestampQueue.Enqueue <| DateTime.Parse("2020-10-08T17:30"); priceQueue.Enqueue 100.
timestampQueue.Enqueue <| DateTime.Parse("2020-10-08T18:00"); priceQueue.Enqueue 110.
timestampQueue.Enqueue <| DateTime.Parse("2020-10-08T18:30"); priceQueue.Enqueue 120.
timestampQueue.Enqueue <| DateTime.Parse("2020-10-08T19:00"); priceQueue.Enqueue 130.
Seq.zip timestampQueue priceQueue
|> Seq.filter (fun (t, _) -> t >= DateTime.Parse("2020-10-08T18:00"))
|> Seq.averageBy snd
(* val it : float = 120.0 *)

虽然我不确定,但如果你每秒调用它5-10次,你会有多少队列条目。。。我用1E6条目测试了下面的代码,它非常快。

该代码只是寻址";跳过";问题的一部分,这似乎是问题中的主要问题。该代码对数组使用(手工编制的(二进制搜索,如果不匹配,则返回匹配的索引或后面的索引。

module MovingAverage
let  N = 1000000
let inline findFirstIndexAbove target a =
let upper = Array.length a
let rec  loop lower upper =
let mid = lower + (upper - lower) / 2
//printfn "lower = %d, mid = %d, upper = %d" lower mid upper
if mid = lower
then
if a.[mid] = target
then mid
else upper
else
if a.[mid] < target
then
loop mid upper
else if a.[mid] > target
then loop lower mid
else mid
loop 0 upper
let test1 () =
let ats = Array.init N (fun _ -> System.DateTime.Now)
findFirstIndexAbove (ats.[10]) ats
let test2 () =
let au64 = Array.init N (fun i -> 2UL * uint64 i)
findFirstIndexAbove (au64.[10]+1UL) au64

在我的机器上(Debian 64位,廉价的AMD cpu,使用dotnet fsi作为交互式外壳(而不是fsharpi!(,我分别获得了以下test1()test2()的时序。

test1((;;实数:00:00:00.223,CPU:00:00:00.220,GC第0代,第1代,第2代val-it:int=10
test2((;;实数:00:00:00.0005,CPU:00:00:00.000,GC第0代、第1代、第2代:0val it:int=11

test1((中花费的大部分时间是初始化带有时间戳的数组。

以上作为一个配方,在一个更接近问题的场景中,这里有一个相当反直觉的数组方法:

[<Struct>]
type TradeData =
{
timeStamp : System.DateTime
price : float 
}
let inline skipBeyondOldData target a =
let upper = Array.length a
let rec  loop lower upper =
let mid = lower + (upper - lower) / 2
//printfn "lower = %d, mid = %d, upper = %d" lower mid upper
if mid = lower
then
if a.[mid].timeStamp = target
then mid
else upper
else
if a.[mid].timeStamp < target
then
loop mid upper
else if a.[mid].timeStamp > target
then loop lower mid
else mid
loop 0 upper
let oneHour = System.TimeSpan.FromHours(1.0)
let cyclicUpdate state (currentPrice : TradeData) =
let tnow = System.DateTime.Now;
let tstart = tnow - oneHour
let workingSetStartIndex = skipBeyondOldData tstart state
let state1 = Array.append (state.[workingSetStartIndex..]) [| currentPrice |]
let avgPrice = Array.averageBy (fun td -> td.price) state1
(avgPrice,state1)
let rng = System.Random()
let initialState = Array.init N (fun _ -> { timeStamp = System.DateTime.Now; price = rng.NextDouble(); })

由此产生的时间:

cyclicUpdate initialState{timeStamp=System.DateTime.Now;price=rng.NextDouble((}
Real:00:00.016,CPU:00:00.010,GC第0代,第1代,第2代
val-it:float*TradeData[]=(0.5001679869,…(

skip成本高昂,对于seqlist都是如此。对于大量项目,由于缓存未命中,list不是最佳选择。

使用像Timestamped<T>这样的单一类型来捕获时间戳和值。使用List<T>而不是队列,或者最好使用循环缓冲区。时间戳似乎是按时间顺序添加的,因此使用List.BinarySearch来代替skipWhile,以尽可能快地定位下限。

此外,您可以通过预先计算k个数据点的块的平均值,然后计算位于每个块之外的+/-点的平均值来分摊查询成本。

最新更新