我有以下类的项目集合:
public class Event
{
public DateTimeOffset Timestamp;
public object Data;
}
我想创建IObservable<Event>
,其中每个项目都在将来Timestamp
时发布。这可以通过Observable.Delay
还是我必须编写自己的IObservable<T>
实现?
我会提到这个结构类似于日志文件。可以有数万个Event
项目,但每秒只能发布 1-2 个。
事实证明,使用可变时间Observable.Delay
过载非常简单:
//given IEnumerable<Event> events:
var observable = events.ToObservable().Delay(ev => Observable.Timer(ev.Timestamp));
虽然我的第一个答案是按预期工作,但对于数十万个事件,创建可观察序列的性能并不理想 - 您需要支付大量的初始化成本(在我的机器上大约 10 秒)。
为了提高性能,利用数据已经排序的性质,我实现了自定义IEnumerable<Event>
,该循环遍历事件,在事件之间生成和休眠。使用此IEnumerable
可以轻松调用ToObservable<T>
并且它按预期工作:
IObservable<Event> CreateSimulation(IEnumerable<Event> events)
{
IEnumerable<Event> simulation()
{
foreach(var ev in events)
{
var now = DateTime.UtcNow;
if(ev.Timestamp > now)
{
Thread.Sleep(ev.Timestamp - now);
}
yield return ev;
}
}
return simulation().ToObservable();
}
似乎 Rx 库缺乏一种机制 将IEnumerable<T>
转换为IObservable<T>
,通过懒惰地枚举它并时移其元素。下面是一个自定义实现。这个想法是用主题Zip
可枚举的源,并通过在适当的时候向主题发送OnNext
通知来控制枚举。
/// <summary>Converts an enumerable sequence to a time shifted observable sequence,
/// based on a time selector function for each element.</summary>
public static IObservable<T> ToObservable<T>(
this IEnumerable<T> source,
Func<T, DateTimeOffset> dueTimeSelector,
IScheduler scheduler = null)
{
scheduler ??= Scheduler.Default;
return Observable.Defer(() =>
{
var subject = new BehaviorSubject<Unit>(default);
return subject
.Zip(source, (_, x) => x)
.Delay(x => Observable.Timer(dueTimeSelector(x), scheduler))
.Do(_ => subject.OnNext(default));
});
}
之所以选择BehaviorSubject
,是因为它具有初始值,因此它可以自然地使车轮运动。
Observable.Defer
运算符用于防止多个订阅共享同一状态(在本例中为subject
)并相互干扰。有关此内容的更多信息,请点击此处。
使用示例:
IEnumerable<Event> events = GetEvents();
IObservable<Event> observable = events.ToObservable(x => x.Timestamp);