反应性延长滑动时间窗口



我有一系列的股票报价,我想获取最后一个小时的所有数据并对其进行一些处理。我正试图通过反应式扩展2.0来实现这一点。我在另一篇文章中读到使用Interval,但我认为这是不推荐的。

这个扩展方法能解决你的问题吗?

public static IObservable<T[]> RollingBuffer<T>(
    this IObservable<T> @this,
    TimeSpan buffering)
{
    return Observable.Create<T[]>(o =>
    {
        var list = new LinkedList<Timestamped<T>>();
        return @this.Timestamp().Subscribe(tx =>
        {
            list.AddLast(tx);
            while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
            {
                list.RemoveFirst();
            }
            o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
        }, ex => o.OnError(ex), () => o.OnCompleted());
    });
}

您正在寻找Window运算符!这是我写的一篇关于处理重合序列(重叠序列窗口)的长文http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

因此,如果你想建立一个滚动平均值,你可以使用这种代码

var scheduler = new TestScheduler();
var notifications = new Recorded<Notification<double>>[30];
for (int i = 0; i < notifications.Length; i++)
{
  notifications[i] = new Recorded<Notification<double>>(i*1000000, Notification.CreateOnNext<double>(i));
}
//Push values into an observable sequence 0.1 seconds apart with values from 0 to 30
var source = scheduler.CreateHotObservable(notifications);
source.GroupJoin(
      source,   //Take values from myself
      _=>Observable.Return(0, scheduler), //Just the first value
      _=>Observable.Timer(TimeSpan.FromSeconds(1), scheduler),//Window period, change to 1hour
      (lhs, rhs)=>rhs.Sum())    //Aggregation you want to do.
    .Subscribe(i=>Console.WriteLine (i));
scheduler.Start();

我们可以看到它在接收值时输出滚动和。

0、1、3、6、10、15、21、28…

很可能Buffer就是您想要的:

var hourlyBatch = ticks.Buffer(TimeSpan.FromHours(1));

或者假设数据已经是Timestamp ed,只需使用Scan

    public static IObservable<IReadOnlyList<Timestamped<T>>> SlidingWindow<T>(this IObservable<Timestamped<T>> self, TimeSpan length)
    {
        return self.Scan(new LinkedList<Timestamped<T>>(),
                         (ll, newSample) =>
                         {
                             ll.AddLast(newSample);
                             var oldest = newSample.Timestamp - length;
                             while (ll.Count > 0 && list.First.Value.Timestamp < oldest)
                                 list.RemoveFirst();
                             return list;
                         }).Select(l => l.ToList().AsReadOnly());
    }

https://github.com/Froussios/New-Intro-To-Rx/blob/master/Part%203%20-%20驯服%20序列/5.%20时移%20序列.md#按时间重叠缓冲区

Console.WriteLine($"{DateTime.Now:T}: Start");
Observable
    .Interval(TimeSpan.FromSeconds(1)).Take(5)
    .Buffer(TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(1))
    .Subscribe(x => { Console.WriteLine($"{DateTime.Now:T}: {string.Join(", ", x.ToArray())}"); });
await Task.Delay(TimeSpan.FromSeconds(10));
Console.WriteLine($"{DateTime.Now:T}: End");

输出:

    17:07:27: Start
    17:07:29: 0, 1
    17:07:30: 0, 1, 2
    17:07:31: 2, 3
    17:07:32: 3, 4
    17:07:32: 4
    17:07:37: End

相关内容

  • 没有找到相关文章

最新更新