IObservable - 在一段时间内忽略新元素



我正在尝试以(我认为是(标准节流方法的不同方式"限制"IObservable。
我想忽略流中第一个未忽略的值之后的 1 秒的值。

例如,如果 1s=5 短划线

source: --1-23--45-----678901234
result: --1-----4------6----1---

关于如何实现这一目标的任何想法?

这是在 Rx 中执行此操作的惯用方法,作为扩展方法 - 下面是使用您的场景的说明和示例。

所需函数的工作方式与Observable.Throttle非常相似,但一旦到达就会发出合格事件,而不是在油门或采样周期的持续时间内延迟。在合格事件后的给定持续时间内,后续事件将被禁止:

public static IObservable<T> SampleFirst<T>(
    this IObservable<T> source,
    TimeSpan sampleDuration,
    IScheduler scheduler = null)
{
    scheduler = scheduler ?? Scheduler.Default;
    return source.Publish(ps => 
        ps.Window(() => ps.Delay(sampleDuration,scheduler))
          .SelectMany(x => x.Take(1)));
}
这个想法是使用窗口的重载,该重载使用窗口

关闭选择器创建不重叠的窗口,该选择器使用由 sampleDuration 时移回来的源。因此,每个窗口将:(a( 由其中的第一个元素关闭,(b( 保持打开状态,直到允许新元素。然后,我们只需从每个窗口中选择第一个元素。

在下面的示例中,我完全重复了您的测试场景,将一个"破折号"建模为 100 个即时报价。请注意,延迟被指定为 499 个时钟周期而不是 500 个时钟周期,因为多个调度程序之间的传递事件会导致 1 个时钟周期漂移 - 实际上,您不需要详述这一点,因为单个时钟周期解析不太可能有意义。ReactiveTest类和OnNext帮助程序方法通过包含 Rx 测试框架 nuget 包rx-testing来实现:

public class Tests : ReactiveTest
{
    public void Scenario()
    {
        var scheduler = new TestScheduler();
        var test = scheduler.CreateHotObservable<int>(    
            // set up events as per the OP scenario
            // using 1 dash = 100 ticks        
            OnNext(200, 1),
            OnNext(400, 2),
            OnNext(500, 3),
            OnNext(800, 4),
            OnNext(900, 5),
            OnNext(1500, 6),
            OnNext(1600, 7),
            OnNext(1700, 8),
            OnNext(1800, 9),
            OnNext(1900, 0),
            OnNext(2000, 1),
            OnNext(2100, 2),
            OnNext(2200, 3),
            OnNext(2300, 4)
            );
        test.SampleFirst(TimeSpan.FromTicks(499), scheduler)
            .Timestamp(scheduler)
            .Subscribe(x =>  Console.WriteLine(
                "Time: {0} Value: {1}", x.Timestamp.Ticks, x.Value));
        scheduler.Start();
    }
}

请注意,输出符合你的方案:

Time: 200 Value: 1
Time: 800 Value: 4
Time: 1500 Value: 6
Time: 2000 Value: 1

这应该可以解决问题。可能会有一个更短的实现。

Scan中的累积存储最后保留ItemTimestamp,并标记是否Keep每个项目。

public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan duration)
{
    return observable
        .Timestamp()
        .Scan(
            new
            {
                Item = default(T),
                Timestamp = DateTimeOffset.MinValue,
                Keep = false
            },
            (a, x) =>
            {
                var keep = a.Timestamp + duration <= x.Timestamp;
                return new
                {
                    Item = x.Value,
                    Timestamp = keep ? x.Timestamp : a.Timestamp,
                    Keep = keep
                };
            }
        })
        .Where(a => a.Keep)
        .Select(a => a.Item);
}

相关内容

  • 没有找到相关文章

最新更新