我需要处理来自EventLog的事件。使用附加到 EventRecordWrite 事件的 EventLogWatcher 很容易做到这一点。但是,我感兴趣的查询(eventid == 299 || eventid == 500)提供了比我需要的更多的事件。下面是流的示例
Event ID Correlation ID
299 1AD... (this is actually a guid) X1
500 1AD... X2
500 1AD...
500 1AD...
299 43B... Y1
299 EDB... Z1
500 43B... Y2
500 EDB... Z2
500 43B...
500 43B...
我对事件 299 和与 299 事件的相关 ID 匹配的第一个事件 500 感兴趣。我在上面的流中标记了它们,这是我感兴趣的输出集合:[X1, X2, Y1, Y2, Z1, Z2]
这可能是一个字典,使用相关 id 作为键,EventRecord
元组作为值
{ 1AD.., <X1, X2> }
{ 43B.., <Y1, Y2> }
{ EDB.., <Z1, Z2> }
在一般情况下,事件可能会按顺序排列(299 然后是 500),但在高并发情况下,我预计两个 299 事件会在一起,然后是 500 个事件,所以我不想依赖事件的顺序。相关 id 是关联它们的关键(这是事件eventRecord.Properties[0]
的第一个属性)
我认为这可以通过状态机来解决,但看看是否有人想出一个由对可观察量的查询表示的 Rx 的解决方案会很有趣。这会将模式匹配逻辑保留在一个位置。
更新:这是解决方案的答案。谢谢吉迪恩,这正是我需要的起点!
var pairs = events
.Where(e299 => e299.EventArgs.EventRecord.Id == 299)
.SelectMany(e299 => events.Where(e500 => e500.EventArgs.EventRecord.Id == 500 &&
e299.EventArgs.EventRecord.Properties[0].Value.ToString() ==
e500.EventArgs.EventRecord.Properties[0].Value.ToString())
.Take(1),
(e299, e500) => new { First = e299, Second = e500 });
提前感谢,马蒂亚斯
如果 299 个事件总是在 500 个事件之前出现,那么SelectMany
和Where
就足够了。
var events; //declared somewhere
var pairs = from e299 in events
where e299.eventid == 299
from e500 in events
where e500.eventid == 500 &&
e299.Correlation == e500.Correlation
select new with {Correlation = e299.Correlation,
First = e299,
Second = e500}
如果您的源对于每个相关的 299 事件有多个 500 事件,您可能需要切换到 lambda 语法并向第二个订阅添加Take(1)
。
您应该考虑使用 Observable.When,以及 Joins。这将允许您根据需要编写复杂的连接模式。
例如,请参阅 https://stackoverflow.com/a/3868608/13131 和 System.Reactive.Join 指南。
你说的"状态机"是什么意思。
但是你可以写一个收集的观察器(eventid == 299 || eventid == 500)。使用限制策略(收集最新的 1000 个事件,或者最近的一分钟,我不知道),当它检测到一对事件 299、500 具有相同的 guid 时,它会引发一个事件,然后从其内部字典/事件列表中删除。
然后,这种善良的观察者是系统其余部分的可观察对象。
也许可以收集事件的列表,当检测到新事件时,类似于
事件。Where(e => e.eventid == newevent.eventid && ((e.id == 299 && newev.id == 500) ||(e.id == 500 &&newev.id == 299))
在性能上可能就足够了,而不是按 GUID 使用字典。
以后见,马修!