public interface Event
{
Guid identifier;
Timestamp ts;
}
我们正在考虑使用反应式扩展来重写我的金融公司的问题。
前提是我们得到由 Guid(股票代码 + 嵌入其中的唯一性熵(、时间戳和值字段标识的事件。这些以很高的速度出现,直到"至少"在 X 秒(10 秒(之后,我们才能对这些对象采取行动,之后我们必须对它们采取行动,并将它们从系统中删除。
把它想象成两个窗口,一个初始窗口为"10 秒"(例如 T0 到 T10(,我们在其中标识所有唯一事件(基本上按 guid 分组(,然后我们查看下一个"10 秒"、"辅助窗口"(T10-T20(,以确保我们实施"至少"10 秒的策略。从"初始窗口"中,我们删除所有事件(因为我们已经考虑了它们(,然后从"辅助窗口"中删除"初始窗口"中发生的事件。我们继续移动10秒的滑动窗口,所以现在我们看着窗口T20-T30,重复并冲洗。
我如何在 Rx 中实现这一点,因为这似乎是要走的路。
如果您可以依靠服务器时钟和消息中的时间戳(也就是说,我们处于"现实生活"模式(,并且您正在滑动 10 秒延迟而不是跳跃 10 秒窗口,那么您可以将事件延迟 10 秒:
var events = new Subject<Event>();
var delayedEvents = events.Delay(TimeSpan.FromSeconds(10));
检查唯一事件等只是将它们添加到某种集合中:
var guidSet = new HashSet<Guid>();
delayedEvents.Do(e => guidSet.Add(e.identifier));
如果你的问题是你必须等待 10 秒,然后一次处理最后 10 秒,那么你只想缓冲 10 秒:
var bufferedEvents = events.Buffer(TimeSpan.FromSeconds(10));
bufferedEvents.Do(es => { foreach (var e in es) guidSet.Add(e.identifier); });
我还没有展示滑动 10 秒窗口的示例,因为我无法想象这就是您想要的(事件被处理不止一次(。
现在我们认真起来。假设您不想依赖墙时间,而是想利用事件中的时间来驱动逻辑。假设事件被重新定义为:
public class Event
{
public Guid identifier;
public DateTime ts;
}
创建历史调度程序并从原始调度程序提供调度事件:
var scheduler = new HistoricalScheduler();
var driveSchedule = events.Subscribe(e => scheduler.AdvanceTo(e.ts));
var target = events.SelectMany(e => Observable.Timer(e.ts, scheduler).Select(_ => e));
现在你可以简单地在target
上使用常规的Rx组合器而不是event
,只需通过调度器,以便适当地触发它们,例如:
var bufferedEvents = target.Buffer(TimeSpan.FromSeconds(10), scheduler);
这是一个简单的测试。创建一百个事件,每个事件"虚拟"相隔 30 秒,但每秒实时触发:
var now = DateTime.Now;
var test = Enumerable.Range(0,99).Select(i =>
Scheduler.ThreadPool.Schedule(
TimeSpan.FromSeconds(i),
() => events.OnNext(new Event() {
identifier = Guid.NewGuid(),
ts = now.AddSeconds(i * 30)
})
)
).ToList();
订阅它并请求 60 秒的缓冲事件 - 实际上每 2 个"真实"秒(60 虚拟秒(接收 2 个事件:
target.Select(e => String.Format("{0} {1}", e.identifier, e.ts.ToString()))
.Buffer(TimeSpan.FromSeconds(60), scheduler)
.Select(es => String.Join(" - ", es))
.DumpLive();