非常类似于这个问题:Rx IObservable缓冲平滑突发事件,我对平滑可能发生突发事件的observable很感兴趣。
希望下面的图表说明了我的目标:
Raw: A--B--CDE-F--------------G-----------------------
Interval: o--o--o--o--o--o--o--o--o--o--o--o--o--o--o--o--o
Output: A--B--C--D--E--F-----------G---------------------
给定原始流,我希望在有规律的间隔内扩展这些事件。
节流不起作用,因为我最终会丢失原始序列的元素。
如果原始流比计时器更频繁,则Zip工作良好,但如果存在没有原始事件的时间段则失败。
编辑
作为对Dan的回答的回应,Buffer的问题是,如果许多事件在短时间间隔内爆发,那么我接收事件的频率就太高了。下面显示了缓冲区大小为3时可能发生的情况,并将超时配置为所需的时间间隔:
Raw: -ABC-DEF-----------G-H-------------------------------
Interval: o--------o--------o--------o--------o--------o--------
Buffered: ---A---D-------------------G--------------------------
B E H
C F
Desired: ---------A--------B--------C--------D--------E ..etc.
这个怎么样?(受James在评论中提到的答案的启发)…
public static IObservable<T> Regulate<T>(this IObservable<T> source, TimeSpan period)
{
var interval = Observable.Interval(period).Publish().RefCount();
return source.Select(x => Observable.Return(x)
.CombineLatest(interval, (v, _) => v)
.Take(1))
.Concat();
}
它将原始可观察对象中的每个值转换为它自己的可观察对象。CombineLatest
意味着它不会产生值,直到间隔产生值。然后我们只需要从每个可观察对象中取一个值并连接起来。
原始可观察对象中的第一个值延迟一个周期。
看起来你想使用的是Buffer。其中一个重载允许您指定间隔以及缓冲区长度。可以将长度设置为1。
Raw.Buffer(interval, 1);
有关其使用的更多示例,您可以参考IntroToRX站点。