我迷路了。
我尝试从一长串数据库记录中获取一些聚合值(主要是Count
)。我们曾经使用常规的 linq,但数据量已经变得很大,无法放入内存中。我想简单地将查询转换为IObservable以获得"流"结果。但我一定错过了什么。大多数示例和文档似乎没有考虑到这种情况。也许 Rx 不是正确的工具集。
因此,对于问题的重现,我只是生成一些随机数据。然后,我对其进行分组,并期望每个组中的项目数。
static void Main(string[] args)
{
// lots of DB records; from IEnumerable to IObservable
var records= //GetRecords();
///*
Observable.Interval(TimeSpan.FromMilliseconds(11))
.Take(100)
.Select(i => new { Group = DateTime.Now.Millisecond % 10 });
//*/
var result = from r in records
group r by r.Group into g
select new
{
Key = g.Key,
Count = g.Count()
};
foreach (var item in result.ToEnumerable())
{
Console.WriteLine("{0} - {1}", item.Key, item.Count.Wait());
}
}
结果只会给我第一项的值:
8 - 12
4 - 0
0 - 0
5 - 0
1 - 0
7 - 0
2 - 0
3 - 0
9 - 0
6 - 0
我在这里做错了什么?
Count()
返回一个IObservable<int>
,并且您订阅它的时间较晚,也就是说,当您从events
的所有值都被观察到时。我不是 100% 确定Group
的行为,但似乎您需要更早订阅Count()
以避免丢失元素。将.ToTask()
添加到.Count()
,看看会发生什么。这样,打电话Wait()
毕竟是有意义的。