我有一个IObservable<T>
序列,其中T
是一个KeyValuePair<TKey, TValue>
,我使用来自System.Reactive.Linq
GroupBy
进行分组。
我想对每个IGroupedObservable<TKey, KeyValuePair<TKey, TValue>>
执行聚合操作,但该聚合被定义为Func<IEnumerable<TValue>, TValue>
。
例如,在这里我想计算每个不同单词出现的次数并将其打印到控制台:
Func<IEnumerable<int>, int> aggregate = x => x.Count();
using (new[] { "one", "fish", "two", "fish" }
.Select(x => new KeyValuePair<string, int>(x, 1))
.ToObservable()
.GroupBy(x => x.Key)
.Select(x => new KeyValuePair<string, IEnumerable<int>>(
x.Key,
x.Select(y => y.Value).ToEnumerable()))
//.SubscribeOn(Scheduler.Default)
.Subscribe(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]")))
{
}
我希望输出与此类似(顺序不重要):
one [1]
fish [2]
two [1]
但相反,它要么阻塞(可能是死锁),要么根本不提供任何输出(当我取消注释 LINQ 语句的SubscribeOn
子句时)。
我试图从实际使用场景中减少上述代码,该场景尝试链接两个 TPL 数据流块但遇到类似的行为:
Func<IEnumerable<int>, int> aggregate = x => x.Sum();
var sourceBlock = new TransformBlock<string, KeyValuePair<string, int>>(x => new KeyValuePair<string, int>(x, 1));
var targetBlock = new ActionBlock<KeyValuePair<string, IEnumerable<int>>>(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]"));
using (sourceBlock.AsObservable()
.GroupBy(x => x.Key)
.Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Select(y => y.Value).ToEnumerable()))
.Subscribe(targetBlock.AsObserver()))
{
foreach (var kvp in new[] { "one", "fish", "two", "fish" })
{
sourceBlock.Post(kvp);
}
sourceBlock.Complete();
targetBlock.Completion.Wait();
}
我知道有框架提供了Sum
和Count
方法,这些方法IObservable<T>
但我只能IEnumerable<T>
聚合函数。
我是否误解了ToEnumerable
,我该怎么做才能解决它?
编辑:IEnumerable<T>
的约束是由我尝试链接的两个数据流块的目标引入的,其签名不是我的要更改的。
GroupBy
的工作方式是这样的:当新元素到达时,它会提取一个键,并查看该键之前是否已经被观察到过。如果没有 - 它会创建新组(新的可观察)并按下密钥和可观察的密钥。关键点是 - 当您订阅GroupBy
并且项目被推送到您的订阅时 - 序列尚未分组。被推送的是组键和另一个可观察的(IGroupedObservable
),该组中的元素将被推送到该组。
您在代码中所做的本质上是订阅GroupBy
然后在GroupBy
订阅中阻止尝试枚举IGroupingObservable
。但此时无法枚举它,因为分组不完整。要使其完整 -GroupBy
应处理整个序列,但不能,因为它在等待订阅处理程序完成时被阻止。订阅处理程序等待GroupBy
完成(阻止尝试枚举尚未就绪的序列)。因此,您陷入了僵局。
如果您尝试引入ObserveOn(Scheduler.Default)
在线程池线程上运行订阅处理程序 - 这将无济于事。它将消除死锁,但会引入竞争条件,您将丢失项目,因为您在开始枚举ToEnumerable
结果时仅订阅单个组。此时可能为时已晚,并且在订阅之前(通过开始枚举)已将某些项目推送到可观察的单个组。这些项目不会重播,因此会丢失。
什么会帮助它确实使用Count()
为IObservable
提供,但由于某种原因,您说您不能这样做。
对于数据流块,可以尝试如下操作:
sourceBlock.AsObservable()
.GroupBy(x => x.Key)
.Select(x => {
var res = new { x.Key, Value = x.Select(y => y.Value).Replay() };
// subscribe right here
// Replay will ensure that no items are missed
res.Value.Connect();
return res;
})
// observe on thread pool threads to not deadlock if necessary
// in the example with datablock in your question - it is not
//.ObserveOn(Scheduler.Default)
// now no deadlock and no missing items
.Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Value.ToEnumerable()))
.Subscribe(targetBlock.AsObserver())