迭代从 RX 中的 IGroupedObservable 中选择的 IEnumerable



我有一个IObservable<T>序列,其中T是一个KeyValuePair<TKey, TValue>,我使用来自System.Reactive.LinqGroupBy进行分组。

我想对每个IGroupedObservable<TKey, KeyValuePair<TKey, TValue>>执行聚合操作,但该聚合被定义为Func<IEnumerable<TValue>, TValue>

例如,在这里我想计算每个不同单词出现的次数并将其打印到控制台:

Func<IEnumerable<int>, int> aggregate = x => x.Count();
using (new[] { "one", "fish", "two", "fish" }
.Select(x => new KeyValuePair<string, int>(x, 1))
.ToObservable()
.GroupBy(x => x.Key)
.Select(x => new KeyValuePair<string, IEnumerable<int>>(
x.Key,
x.Select(y => y.Value).ToEnumerable()))
//.SubscribeOn(Scheduler.Default)
.Subscribe(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]")))
{
}

我希望输出与此类似(顺序不重要):

one [1]
fish [2]
two [1]

但相反,它要么阻塞(可能是死锁),要么根本不提供任何输出(当我取消注释 LINQ 语句的SubscribeOn子句时)。

我试图从实际使用场景中减少上述代码,该场景尝试链接两个 TPL 数据流块但遇到类似的行为:

Func<IEnumerable<int>, int> aggregate = x => x.Sum();
var sourceBlock = new TransformBlock<string, KeyValuePair<string, int>>(x => new KeyValuePair<string, int>(x, 1));
var targetBlock = new ActionBlock<KeyValuePair<string, IEnumerable<int>>>(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]"));
using (sourceBlock.AsObservable()
.GroupBy(x => x.Key)
.Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Select(y => y.Value).ToEnumerable()))
.Subscribe(targetBlock.AsObserver()))
{
foreach (var kvp in new[] { "one", "fish", "two", "fish" })
{
sourceBlock.Post(kvp);
}
sourceBlock.Complete();
targetBlock.Completion.Wait();
}

我知道有框架提供了SumCount方法,这些方法IObservable<T>但我只能IEnumerable<T>聚合函数。

我是否误解了ToEnumerable,我该怎么做才能解决它?

编辑:IEnumerable<T>的约束是由我尝试链接的两个数据流块的目标引入的,其签名不是我的要更改的。

GroupBy的工作方式是这样的:当新元素到达时,它会提取一个键,并查看该键之前是否已经被观察到过。如果没有 - 它会创建新组(新的可观察)并按下密钥和可观察的密钥。关键点是 - 当您订阅GroupBy并且项目被推送到您的订阅时 - 序列尚未分组。被推送的是组键和另一个可观察的(IGroupedObservable),该组中的元素将被推送到该组。

您在代码中所做的本质上是订阅GroupBy然后在GroupBy订阅中阻止尝试枚举IGroupingObservable。但此时无法枚举它,因为分组不完整。要使其完整 -GroupBy应处理整个序列,但不能,因为它在等待订阅处理程序完成时被阻止。订阅处理程序等待GroupBy完成(阻止尝试枚举尚未就绪的序列)。因此,您陷入了僵局。

如果您尝试引入ObserveOn(Scheduler.Default)在线程池线程上运行订阅处理程序 - 这将无济于事。它将消除死锁,但会引入竞争条件,您将丢失项目,因为您在开始枚举ToEnumerable结果时仅订阅单个组。此时可能为时已晚,并且在订阅之前(通过开始枚举)将某些项目推送到可观察的单个组。这些项目不会重播,因此会丢失。

什么会帮助它确实使用Count()IObservable提供,但由于某种原因,您说您不能这样做。

对于数据流块,可以尝试如下操作:

sourceBlock.AsObservable()
.GroupBy(x => x.Key)
.Select(x => {
var res = new { x.Key, Value = x.Select(y => y.Value).Replay() };
// subscribe right here
// Replay will ensure that no items are missed
res.Value.Connect();
return res;
})                
// observe on thread pool threads to not deadlock if necessary
// in the example with datablock in your question - it is not
//.ObserveOn(Scheduler.Default)
// now no deadlock and no missing items
.Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Value.ToEnumerable()))
.Subscribe(targetBlock.AsObserver())

相关内容

  • 没有找到相关文章

最新更新