GroupBy然后ObserveOn会丢失项目



在LinqPad:中尝试此操作

Observable
    .Range(0, 10)
    .GroupBy(x => x % 3)
    .ObserveOn(Scheduler.NewThread)
    .SelectMany(g => g.Select(x => g.Key + " " + x))
    .Dump()

结果显然是不确定的,但在每种情况下,我都无法收到全部10个项目。我目前的理论是,当管道编组到新线程时,项目正在通过分组的可观察到的未观察到的。

Linqpad不知道你正在运行所有这些线程——它会立即到达代码的末尾(记住,Rx语句并不总是同步执行,这就是想法!),等待几毫秒,然后以吹走AppDomain及其所有线程(这些线程还没有跟上)结束。尝试添加线程。睡眠到最后,让新线程有时间跟上进度。

顺便说一句,Scheduler。NewThread是一个效率非常低的调度器,EventLoopScheduler(只创建一个线程)或scheduler。TaskPool(使用TPL池,就好像你为每个项目创建了一个Task)要高效得多(当然,在这种情况下,因为你只有10个项目,Scheduler.Immediate是最好的!)

在GroupBy操作中启动对新组的订阅和延迟实现新订阅之间的时间问题。如果将迭代次数从10次增加到100次,那么在一段时间后应该会开始看到一些结果。

此外,如果将GroupBy更改为。在(x=>x%3==0)中,您可能会注意到没有丢失任何值,因为对IObservable组的动态订阅不需要初始化新的观察者。

相关内容

  • 没有找到相关文章