使用反应式扩展重播带时间戳的事件流



我有以下类的项目集合:

public class Event
{
public DateTimeOffset Timestamp;
public object Data;
}

我想创建IObservable<Event>,其中每个项目都在将来Timestamp时发布。这可以通过Observable.Delay还是我必须编写自己的IObservable<T>实现?

我会提到这个结构类似于日志文件。可以有数万个Event项目,但每秒只能发布 1-2 个。

事实证明,使用可变时间Observable.Delay过载非常简单:

//given IEnumerable<Event> events:
var observable = events.ToObservable().Delay(ev => Observable.Timer(ev.Timestamp));

虽然我的第一个答案是按预期工作,但对于数十万个事件,创建可观察序列的性能并不理想 - 您需要支付大量的初始化成本(在我的机器上大约 10 秒)。

为了提高性能,利用数据已经排序的性质,我实现了自定义IEnumerable<Event>,该循环遍历事件,在事件之间生成和休眠。使用此IEnumerable可以轻松调用ToObservable<T>并且它按预期工作:

IObservable<Event> CreateSimulation(IEnumerable<Event> events)
{
IEnumerable<Event> simulation()
{
foreach(var ev in events)
{
var now = DateTime.UtcNow;
if(ev.Timestamp > now)
{
Thread.Sleep(ev.Timestamp - now);
}
yield return ev;          
}
}
return simulation().ToObservable();
}

似乎 Rx 库缺乏一种机制 将IEnumerable<T>转换为IObservable<T>,通过懒惰地枚举它并时移其元素。下面是一个自定义实现。这个想法是用主题Zip可枚举的源,并通过在适当的时候向主题发送OnNext通知来控制枚举。

/// <summary>Converts an enumerable sequence to a time shifted observable sequence,
/// based on a time selector function for each element.</summary>
public static IObservable<T> ToObservable<T>(
this IEnumerable<T> source,
Func<T, DateTimeOffset> dueTimeSelector,
IScheduler scheduler = null)
{
scheduler ??= Scheduler.Default;
return Observable.Defer(() =>
{
var subject = new BehaviorSubject<Unit>(default);
return subject
.Zip(source, (_, x) => x)
.Delay(x => Observable.Timer(dueTimeSelector(x), scheduler))
.Do(_ => subject.OnNext(default));
});
}

之所以选择BehaviorSubject,是因为它具有初始值,因此它可以自然地使车轮运动。

Observable.Defer运算符用于防止多个订阅共享同一状态(在本例中为subject)并相互干扰。有关此内容的更多信息,请点击此处。

使用示例:

IEnumerable<Event> events = GetEvents();
IObservable<Event> observable = events.ToObservable(x => x.Timestamp);

相关内容

  • 没有找到相关文章

最新更新