平行.每个丢失/忽略/跳过/不添加对象



我的_baseBlockContainer.GetBaseBlocks();返回具有15317对象的ConcurrentQueue。为了进一步处理,我想按Type对它们进行排序。然而,它总是"错过"。一些对象。

似乎我的Parallel.ForEach不是线程安全的,因为TypeConcurrentQueue中的对象数量有时比同步foreach排序时少(Type少1到250个对象);但我不明白在哪里/为什么。

var baseBlocks = _baseBlockContainer.GetBaseBlocks();
var baseBlocksByTypeConcurrent = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this always differ
Parallel.ForEach(baseBlocks, bb =>
{
if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
{
baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
});
var baseBlocksByType = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this are always the same
foreach (var bb in baseBlocks)
{
if (!baseBlocksByType.ContainsKey(bb.GetType()))
{
baseBlocksByType[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByType[bb.GetType()].Enqueue(bb);
}

替换为

if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
{
baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);

与这个:

baseBlocksByTypeConcurrent.TryAdd(bb.GetType(), new ConcurrentQueue<BaseBlock>());
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);

你现有的代码的问题是,如果.ContainsKey计算为false在多个线程在同一时间相同的块类型,那么他们将所有设置值对应的类型为一个新的队列,擦除任何现有的队列为该类型。也就是说:ContainsKey和索引器本身是线程安全的,但是当你这样单独使用时就不安全了。

TryAdd是线程安全的,只会添加该键一次,而不是像分配给索引器那样重写它。

您的代码遇到了所谓的竞态条件。单独使用并发集合并不能防止竞态条件的发生。您还必须通过使用它们特殊的原子api来正确地使用它们。在您的情况下,适当使用的API是GetOrAdd方法:

如果密钥不存在,则向ConcurrentDictionary<TKey,TValue>添加密钥/值对。返回新值,如果键已经存在则返回现有值。

用法:

ParallelOptions options = new()
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(baseBlocks, options, bb =>
{
baseBlocksByTypeConcurrent
.GetOrAdd(bb.GetType(), _ => new ConcurrentQueue<BaseBlock>())
.Enqueue(bb);
});

作为旁注,无论何时使用Parallel.ForEach方法,建议显式指定MaxDegreeOfParallelism。默认的MaxDegreeOfParallelism是-1,这意味着无界并行,这在实践中使ThreadPool饱和。

¹这是我的建议,不是微软的。官方文档提供的一般建议是不需要修改MaxDegreeOfParallelism设置。微软的论点是,默认情况下将此选项配置为Environment.ProcessorCount可能会导致性能下降,并且在一个迭代依赖于另一个执行迭代的情况下,它可能会导致死锁。显然,微软并不认为ThreadPool饱和是一个严重的问题。

最新更新