我正在尝试以(我认为是(标准节流方法的不同方式"限制"IObservable。
我想忽略流中第一个未忽略的值之后的 1 秒的值。
例如,如果 1s=5 短划线
source: --1-23--45-----678901234
result: --1-----4------6----1---
关于如何实现这一目标的任何想法?
这是在 Rx 中执行此操作的惯用方法,作为扩展方法 - 下面是使用您的场景的说明和示例。
所需函数的工作方式与Observable.Throttle
非常相似,但一旦到达就会发出合格事件,而不是在油门或采样周期的持续时间内延迟。在合格事件后的给定持续时间内,后续事件将被禁止:
public static IObservable<T> SampleFirst<T>(
this IObservable<T> source,
TimeSpan sampleDuration,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return source.Publish(ps =>
ps.Window(() => ps.Delay(sampleDuration,scheduler))
.SelectMany(x => x.Take(1)));
}
这个想法是使用窗口的重载,该重载使用窗口关闭选择器创建不重叠的窗口,该选择器使用由 sampleDuration 时移回来的源。因此,每个窗口将:(a( 由其中的第一个元素关闭,(b( 保持打开状态,直到允许新元素。然后,我们只需从每个窗口中选择第一个元素。
在下面的示例中,我完全重复了您的测试场景,将一个"破折号"建模为 100 个即时报价。请注意,延迟被指定为 499 个时钟周期而不是 500 个时钟周期,因为多个调度程序之间的传递事件会导致 1 个时钟周期漂移 - 实际上,您不需要详述这一点,因为单个时钟周期解析不太可能有意义。ReactiveTest
类和OnNext
帮助程序方法通过包含 Rx 测试框架 nuget 包rx-testing
来实现:
public class Tests : ReactiveTest
{
public void Scenario()
{
var scheduler = new TestScheduler();
var test = scheduler.CreateHotObservable<int>(
// set up events as per the OP scenario
// using 1 dash = 100 ticks
OnNext(200, 1),
OnNext(400, 2),
OnNext(500, 3),
OnNext(800, 4),
OnNext(900, 5),
OnNext(1500, 6),
OnNext(1600, 7),
OnNext(1700, 8),
OnNext(1800, 9),
OnNext(1900, 0),
OnNext(2000, 1),
OnNext(2100, 2),
OnNext(2200, 3),
OnNext(2300, 4)
);
test.SampleFirst(TimeSpan.FromTicks(499), scheduler)
.Timestamp(scheduler)
.Subscribe(x => Console.WriteLine(
"Time: {0} Value: {1}", x.Timestamp.Ticks, x.Value));
scheduler.Start();
}
}
请注意,输出符合你的方案:
Time: 200 Value: 1
Time: 800 Value: 4
Time: 1500 Value: 6
Time: 2000 Value: 1
这应该可以解决问题。可能会有一个更短的实现。
Scan
中的累积存储最后保留Item
的Timestamp
,并标记是否Keep
每个项目。
public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan duration)
{
return observable
.Timestamp()
.Scan(
new
{
Item = default(T),
Timestamp = DateTimeOffset.MinValue,
Keep = false
},
(a, x) =>
{
var keep = a.Timestamp + duration <= x.Timestamp;
return new
{
Item = x.Value,
Timestamp = keep ? x.Timestamp : a.Timestamp,
Keep = keep
};
}
})
.Where(a => a.Keep)
.Select(a => a.Item);
}