使用RX时,扩展可用运算符的正确方法是什么?
我想建立一些我认为有用的操作。
第一个运算就是一个级数的标准差。
第二个运算是第n个滞后值,即,如果我们滞后2,并且我们的级数是A B C D e F,当F被推动时,滞后将是D,当A被推动时滞后将是空/空,当B被推动时延迟将是空,当C被推动时延迟将是
将这些类型的运算符建立在rx.codeplex.com的内置程序基础上有意义吗?或者有更简单的方法吗?
在惯用Rx中,任意延迟可以由Zip
组成。
let lag (count : int) o =
let someo = Observable.map Some o
let delayed = Observable.Repeat(None, count).Concat(someo)
Observable.Zip(someo, delayed, (fun c d -> d))
对于滚动缓冲区,最有效的方法是简单地使用固定大小的Queue
/ResizeArray
。
let rollingBuffer (size : int) o =
Observable.Create(fun (observer : IObserver<_>) ->
let buffer = new Queue<_>(size)
o |> Observable.subscribe(fun v ->
buffer.Enqueue(v)
if buffer.Count = size then
observer.OnNext(buffer.ToArray())
buffer.Dequeue() |> ignore
)
)
对于numbers |> rollingBuffer 3 |> log
:
seq [0L; 1L; 2L]
seq [1L; 2L; 3L]
seq [2L; 3L; 4L]
seq [3L; 4L; 5L]
...
对于相邻值的配对,您只需使用Observable.pairwise
let delta (a, b) = b - a
let deltaStream = numbers |> Observable.pairwise |> Observable.map(delta)
如果要应用滚动计算,Observable.Scan
会更简洁。
其中一些比其他更容易(像往常一样)。对于计数(而不是时间)的"滞后",您只需使用Observable创建一个滑动窗口。缓冲区大小等于"lag",然后取结果列表的第一个元素。
到目前为止滞后=3,函数为:
obs.Buffer(3,1).Select(l => l.[0])
这很容易转化为一个扩展函数。我不知道它是否有效,因为它重复使用了相同的列表,但在大多数情况下,这并不重要。我知道你想要F#,翻译很简单。
对于运行聚合,通常可以使用Observable.Scan
来获取"正在运行"的值。这是根据迄今为止看到的所有值来计算的(实现起来非常简单)——也就是说,你所要实现的每个后续元素都是上一个聚合和新元素。
如果出于任何原因,您需要一个基于滑动窗口的运行聚合,那么我们将进入更困难的领域。在这里,你首先需要一个可以给你一个滑动窗口的操作——上面的Buffer涵盖了这一点。但是,您需要知道哪些值已被从此窗口中删除,哪些值已添加。
因此,我推荐一个新的Observable函数,它在现有窗口+新值的基础上维护一个内部窗口,并返回新窗口+移除值+添加值。你可以用Observable来写这个。扫描(为了高效实现,我建议使用内部队列)。它应该采用一个函数来确定在给定新值的情况下删除哪些值(这样就可以通过时间或计数对其进行参数化)。
当时,Observable。扫描可以再次用于获取旧的聚合+窗口+删除的值+添加的值,并提供新的聚合。
希望这能有所帮助,我确实意识到这是一大堆文字。如果你能确认需求,我可以帮助你为特定用例提供实际的扩展方法。
对于lag
,您可以执行类似的操作
module Observable =
let lag n obs =
let buf = System.Collections.Generic.Queue()
obs |> Observable.map (fun x ->
buf.Enqueue(x)
if buf.Count > n then Some(buf.Dequeue())
else None)
此:
Observable.Range(1, 9)
|> Observable.lag 2
|> Observable.subscribe (printfn "%A")
|> ignore
打印:
<null>
<null>
Some 1
Some 2
Some 3
Some 4
Some 5
Some 6
Some 7