BlockingCollection<int> blockingCollection = new BlockingCollection<int>();
// create and start a producer
Task.Factory.StartNew(() => {
// put items into the collectioon
for (int i = 0; i < 1000; i++)
{
blockingCollection.Add(i);
}
// mark the collection as complete
blockingCollection.CompleteAdding();
});
// create and start a producer
Task.Factory.StartNew(() => {
while (!blockingCollection.IsCompleted)
{
// take an item from the collection
int item = blockingCollection.Take();
// print out the item
Console.WriteLine("Taskid{1} Item {0}", item, Task.CurrentId);
}
});
// create and start a producer
Task.Factory.StartNew(() => {
while (!blockingCollection.IsCompleted) // if the blockingCollection is not completed
{
// take an item from the collection
int item = blockingCollection.Take(); // but in here, all the items have been taken by other thread, this line will wait forever?
// print out the item
Console.WriteLine("Taskid{1} Item {0}", item,Task.CurrentId);
}
});
Console.ReadLine();
关键代码行:
while (!blockingCollection.IsCompleted) // if the blockingCollection is not completed
int item = blockingCollection.Take(); // but in here, all the items have been taken by other thread, this line will wait forever?
是的,BlockingCollection<T>
是线程安全的。
为了避免IsCompleted
检查,您可以在GetConsumingEnumerable()
的帮助下放置相同的逻辑,该逻辑将等到阻塞收集完成:
// Producer:
Task.Run(() => {
for (int i = 0; i < 1000; i++)
blockingCollection.Add(i);
blockingCollection.CompleteAdding();
});
// Consumers:
Task.Run(() => {
foreach (var item in blockingCollection.GetConsumingEnumerable())
Console.WriteLine("Taskid{1} Item {0}", item, Task.CurrentId);
});
Task.Run(() => {
foreach (var item in blockingCollection.GetConsumingEnumerable())
Console.WriteLine("Taskid{1} Item {0}", item, Task.CurrentId);
});
BlockingCollection
类是线程安全的,从某种意义上说,当多个线程并发调用时,其内部状态受到保护,不会损坏。从某种意义上说,它不是线程安全的,因为它的存在就为线程安全提供了线程不安全的代码块!
您的代码是不安全的,因为在调用IsCompleted
和Take
之间,另一个线程可以调用Complete
方法并将BlockingCollection
的状态从未完成更改为已完成。线程安全的解决方案是使用在原子操作中结合IsCompleted
和Take
的 API。BlockingCollection
类中没有这样的 API,但您可以使用GetConsumingEnumerable
方法实现所需的目标。此方法返回一个在等待可用项时阻止的IEnumerable
,并在BlockingCollection
完成时完成。实际上,这是使用此集合的标准和首选方式。
问题 1 - 线程安全吗?
引用阻塞集合文档
为实现 IProducerConsumerCollection 的线程安全集合提供阻塞和绑定功能。
问题2 -Take()
会永远吗?
来自官方文档: MS 文档关于阻止收集 - 采取
OperationCanceledException
块集合为空,并且已标记为已完成添加。
因此,当在 DoneBlockingCollection
上调用Take
时,将引发异常,您可以相应地处理