如何获取 IObservable 的计数



我迷路了。

我尝试从一长串数据库记录中获取一些聚合值(主要是Count)。我们曾经使用常规的 linq,但数据量已经变得很大,无法放入内存中。我想简单地将查询转换为IObservable以获得"流"结果。但我一定错过了什么。大多数示例和文档似乎没有考虑到这种情况。也许 Rx 不是正确的工具集。

因此,对于问题的重现,我只是生成一些随机数据。然后,我对其进行分组,并期望每个组中的项目数。

static void Main(string[] args)
{
    // lots of DB records; from IEnumerable to IObservable
    var records= //GetRecords();
    ///*
        Observable.Interval(TimeSpan.FromMilliseconds(11))
                     .Take(100)
                     .Select(i => new { Group = DateTime.Now.Millisecond % 10 });
    //*/
    var result = from r in records
                 group r by r.Group into g
                 select new
                 {
                     Key = g.Key,
                     Count = g.Count()
                 };
    foreach (var item in result.ToEnumerable())
    {
        Console.WriteLine("{0} - {1}", item.Key, item.Count.Wait());
    }
}

结果只会给我第一项的值:

8 - 12
4 - 0
0 - 0
5 - 0
1 - 0
7 - 0
2 - 0
3 - 0
9 - 0
6 - 0

我在这里做错了什么?

Count()返回一个IObservable<int>,并且您订阅它的时间较晚,也就是说,当您从events的所有值都被观察到时。我不是 100% 确定Group的行为,但似乎您需要更早订阅Count()以避免丢失元素。将.ToTask()添加到.Count(),看看会发生什么。这样,打电话Wait()毕竟是有意义的。

相关内容

  • 没有找到相关文章

最新更新