我已经阅读了之前关于ConcurrentBag
的问题,但没有找到多线程实现的实际示例。
ConcurrentBag是一个线程安全的包实现,针对同一线程将生产和消费存储在包中的数据的场景进行了优化。"
目前这是我代码中的当前用法(这是简化的,而不是实际的代码):
private void MyMethod()
{
List<Product> products = GetAllProducts(); // Get list of products
ConcurrentBag<Product> myBag = new ConcurrentBag<Product>();
//products were simply added here in the ConcurrentBag to simplify the code
//actual code process each product before adding in the bag
Parallel.ForEach(
products,
new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
product => myBag.Add(product));
ProcessBag(myBag); // method to process each items in the concurrentbag
}
我的问题:这是ConcurrentBag
的正确用法吗?在这种情况下可以使用ConcurrentBag
吗?
对我来说,我认为一个简单的List<Product>
和手动锁会做得更好。这样做的原因是,上面的场景已经打破了"同一个线程将生产和消费存储在包中的数据"规则。此外,我还发现,在并行的每个线程中创建的ThreadLocal
存储在操作后仍然存在(即使线程被重用,这是对的吗?),这可能会导致不希望的内存泄漏。我说的对吗,伙计们?或者一个简单的明确或空的方法来删除项目在ConcurrentBag
是足够的?
这看起来是对ConcurrentBag的一个很好的使用。线程局部变量是包的成员,在包被回收的同时,它们也有资格被垃圾收集(清除内容不会释放它们)。您是对的,一个带锁的简单List就足以满足您的情况。如果在循环中所做的工作非常重要,那么线程同步的类型对整体性能影响不大。在这种情况下,您可能更愿意使用您熟悉的内容。
另一个选择是使用ParallelEnumerable。选择,它与您想要做的事情更接近。同样,您将看到的任何性能差异可能都可以忽略不计,坚持使用您所知道的并没有什么错。
一如既往,如果它的性能是关键的,没有什么可以替代尝试和测量。
在我看来bmm60 's是不正确的。ConcurrentBag
实例内部包含每个向其添加项目的线程的迷你包,因此项目插入不涉及任何线程锁,因此所有Environment.ProcessorCount
线程都可以进入full - swing,而不需要等待,也不需要任何线程上下文切换。在迭代收集的项时可能需要线程同步,但是在原始示例中,迭代是在所有插入完成后由单个线程完成的。此外,如果ConcurrentBag
使用互锁技术作为线程同步的第一层,那么可能根本不涉及Monitor操作。
List<T>
实例并使用lock关键字包装其Add()方法调用将严重损害性能。首先,由于Monitor.Enter()
和Monitor.Exit()
调用是恒定的,每个调用都需要深入到内核模式并使用Windows同步原语。其次,有时候一个线程偶尔会被第二个线程阻塞,因为第二个线程还没有完成它的添加。
对我来说,上面的代码是正确使用ConcurrentBag
类的一个很好的例子。
这是
ConcurrentBag
的正确用法吗?在这种情况下可以使用ConcurrentBag
吗?
没有,原因有很多:
- 这不是这个集合的预期使用场景。
ConcurrentBag<T>
用于混合生产者-消费者场景,这意味着每个线程都希望添加和从包中取出物品。你的场景完全不是这样的。您有许多添加项的线程,而没有获取项的线程。ConcurrentBag<T>
的主要应用是创建对象池(创建或销毁可重用对象的池)。考虑到Microsoft.Extensions.ObjectPool包中ObjectPool<T>
类的可用性,甚至这个适合这个集合的小众应用程序也存在争议。 - 不保留插入顺序。即使保持插入顺序并不重要,得到一个混乱的输出也会使调试更加困难。
- 它创建必须由GC收集的垃圾。它为每个线程创建一个
WorkStealingQueue
(内部类),每个类包含一个可扩展数组,因此线程越多,分配的对象就越多。此外,每次枚举时,它都会复制一个新数组中的所有项,并在每个foreach
上显示一个IEnumerator<T> GetEnumerator()
属性。 - 有更好的选择,提供更好的性能和更好的订购行为。
在您的场景中,您可以将并行执行的结果存储在一个简单的数组中。只需创建一个长度等于products.Count
的数组,从Parallel.ForEach
切换到Parallel.For
,并将结果直接分配给results
数组的相应槽位,而根本不做任何同步:
List<Product> products = GetAllProducts(); // Get list of products
Product[] results = Product[products.Count];
Parallel.For(0, products.Count,
new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
i => results[i] = products[i]);
ProcessResults(results);
这样你将得到完美排序的结果,存储在一个容器中,它具有最紧凑的大小和所有。net集合中最快的枚举,只做一个对象分配。
如果您担心上述操作的线程安全性,则无需担心。每个线程在results
数组中的不同槽上写入。在并行执行完成后,当前线程可以完全看到存储在数组中的所有值,因为TPL在任务排队时以及任务执行的开始/结束时包含了适当的barrier(引用)。
(我在这个答案中发布了更多关于ConcurrentBag<T>
的想法)
如果List<T>
与Add()
方法周围的锁一起使用,它将使线程等待,并将降低使用Parallel.ForEach()
的性能增益