阻塞集合线程安全吗?


BlockingCollection<int> blockingCollection = new BlockingCollection<int>();
// create and start a producer
Task.Factory.StartNew(() => {
// put items into the collectioon
for (int i = 0; i < 1000; i++)
{
blockingCollection.Add(i);
}
// mark the collection as complete
blockingCollection.CompleteAdding();
});
// create and start a producer
Task.Factory.StartNew(() => {
while (!blockingCollection.IsCompleted)
{
// take an item from the collection
int item = blockingCollection.Take();
// print out the item
Console.WriteLine("Taskid{1} Item {0}", item, Task.CurrentId);
}
});
// create and start a producer
Task.Factory.StartNew(() => {
while (!blockingCollection.IsCompleted)   // if the blockingCollection is not completed
{
// take an item from the collection
int item = blockingCollection.Take();  // but in here, all the items have been taken by other thread, this line will wait forever?
// print out the item
Console.WriteLine("Taskid{1} Item {0}", item,Task.CurrentId);
}
});
Console.ReadLine();

关键代码行:

while (!blockingCollection.IsCompleted)   // if the blockingCollection is not completed
int item = blockingCollection.Take();  // but in here, all the items have been taken by other thread, this line will wait forever?

是的,BlockingCollection<T>线程安全的

为了避免IsCompleted检查,您可以在GetConsumingEnumerable()的帮助下放置相同的逻辑,该逻辑将等到阻塞收集完成:

// Producer:
Task.Run(() => { 
for (int i = 0; i < 1000; i++)
blockingCollection.Add(i);
blockingCollection.CompleteAdding();
});    
// Consumers:
Task.Run(() => {
foreach (var item in blockingCollection.GetConsumingEnumerable()) 
Console.WriteLine("Taskid{1} Item {0}", item, Task.CurrentId);
});
Task.Run(() => {
foreach (var item in blockingCollection.GetConsumingEnumerable()) 
Console.WriteLine("Taskid{1} Item {0}", item, Task.CurrentId);
});

BlockingCollection线程安全的,从某种意义上说,当多个线程并发调用时,其内部状态受到保护,不会损坏。从某种意义上说,它不是线程安全的,因为它的存在就为线程安全提供了线程不安全的代码块!

您的代码是不安全的,因为在调用IsCompletedTake之间,另一个线程可以调用Complete方法并将BlockingCollection的状态从未完成更改为已完成。线程安全的解决方案是使用在原子操作中结合IsCompletedTake的 API。BlockingCollection类中没有这样的 API,但您可以使用GetConsumingEnumerable方法实现所需的目标。此方法返回一个在等待可用项时阻止的IEnumerable,并在BlockingCollection完成时完成。实际上,这是使用此集合的标准和首选方式。

问题 1 - 线程安全吗?

引用阻塞集合文档

为实现 IProducerConsumerCollection 的线程安全集合提供阻塞和绑定功能。

问题2 -Take()会永远吗?

来自官方文档: MS 文档关于阻止收集 - 采取

OperationCanceledException

块集合为空,并且已标记为已完成添加。

因此,当在 DoneBlockingCollection上调用Take时,将引发异常,您可以相应地处理

最新更新