我的_baseBlockContainer.GetBaseBlocks();
返回具有15317
对象的ConcurrentQueue
。为了进一步处理,我想按Type
对它们进行排序。然而,它总是"错过"。一些对象。
似乎我的Parallel.ForEach
不是线程安全的,因为Type
的ConcurrentQueue
中的对象数量有时比同步foreach
排序时少(Type
少1到250个对象);但我不明白在哪里/为什么。
var baseBlocks = _baseBlockContainer.GetBaseBlocks();
var baseBlocksByTypeConcurrent = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this always differ
Parallel.ForEach(baseBlocks, bb =>
{
if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
{
baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
});
var baseBlocksByType = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this are always the same
foreach (var bb in baseBlocks)
{
if (!baseBlocksByType.ContainsKey(bb.GetType()))
{
baseBlocksByType[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByType[bb.GetType()].Enqueue(bb);
}
替换为
if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
{
baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
与这个:
baseBlocksByTypeConcurrent.TryAdd(bb.GetType(), new ConcurrentQueue<BaseBlock>());
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
你现有的代码的问题是,如果.ContainsKey
计算为false
在多个线程在同一时间相同的块类型,那么他们将所有设置值对应的类型为一个新的队列,擦除任何现有的队列为该类型。也就是说:ContainsKey
和索引器本身是线程安全的,但是当你这样单独使用时就不安全了。
TryAdd
是线程安全的,只会添加该键一次,而不是像分配给索引器那样重写它。
您的代码遇到了所谓的竞态条件。单独使用并发集合并不能防止竞态条件的发生。您还必须通过使用它们特殊的原子api来正确地使用它们。在您的情况下,适当使用的API是GetOrAdd
方法:
如果密钥不存在,则向
ConcurrentDictionary<TKey,TValue>
添加密钥/值对。返回新值,如果键已经存在则返回现有值。
用法:
ParallelOptions options = new()
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(baseBlocks, options, bb =>
{
baseBlocksByTypeConcurrent
.GetOrAdd(bb.GetType(), _ => new ConcurrentQueue<BaseBlock>())
.Enqueue(bb);
});
作为旁注,无论何时使用Parallel.ForEach
方法,建议显式指定MaxDegreeOfParallelism
。默认的MaxDegreeOfParallelism
是-1,这意味着无界并行,这在实践中使ThreadPool
饱和。
¹这是我的建议,不是微软的。官方文档提供的一般建议是不需要修改MaxDegreeOfParallelism
设置。微软的论点是,默认情况下将此选项配置为Environment.ProcessorCount
可能会导致性能下降,并且在一个迭代依赖于另一个执行迭代的情况下,它可能会导致死锁。显然,微软并不认为ThreadPool
饱和是一个严重的问题。