我有多个生产者和一个消费者。但是,如果队列中有一些东西尚未被消耗,则生产者不应该再次将其排队。(使用默认并发队列的唯一无重复阻塞集合)
if (!myBlockingColl.Contains(item))
myBlockingColl.Add(item)
但是,阻塞集合没有包含方法,也没有提供任何类似TryPeek()
的方法。如何访问底层并发队列,以便执行
if (!myBlockingColl.myConcurQ.trypeek(item)
myBlockingColl.Add(item)
在尾翼旋转?
这是个有趣的问题。这是我第一次看到有人要求阻塞队列忽略重复项。奇怪的是,我在BCL中找不到任何你想要的东西。我说这很奇怪,因为BlockingCollection
可以接受IProducerConsumerCollection
作为底层集合,该集合具有TryAdd
方法,该方法被宣传为在检测到重复时能够失败。问题是,我没有看到IProducerConsumerCollection
防止重复的具体实现。至少我们可以自己写。
public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
// TODO: You will need to fully implement IProducerConsumerCollection.
private Queue<T> queue = new Queue<T>();
public bool TryAdd(T item)
{
lock (queue)
{
if (!queue.Contains(item))
{
queue.Enqueue(item);
return true;
}
return false;
}
}
public bool TryTake(out T item)
{
lock (queue)
{
item = null;
if (queue.Count > 0)
{
item = queue.Dequeue();
}
return item != null;
}
}
}
现在我们有了不接受重复的IProducerConsumerCollection
,我们可以这样使用它:
public class Example
{
private BlockingCollection<object> queue = new BlockingCollection<object>(new NoDuplicatesConcurrentQueue<object>());
public Example()
{
new Thread(Consume).Start();
}
public void Produce(object item)
{
bool unique = queue.TryAdd(item);
}
private void Consume()
{
while (true)
{
object item = queue.Take();
}
}
}
你可能不喜欢我对NoDuplicatesConcurrentQueue
的实现。如果您认为需要TPL集合提供的低锁性能,您当然可以自由地使用ConcurrentQueue
或其他工具来实现自己的集合。
更新:
我今天早上能够测试代码。有一些好消息和坏消息。好消息是,这在技术上是可行的。坏消息是,您可能不希望这样做,因为BlockingCollection.TryAdd
会拦截来自底层IProducerConsumerCollection.TryAdd
方法的返回值,并在检测到false
时抛出异常。是的,没错。它不像您期望的那样返回false
,而是生成一个异常。老实说,这既令人惊讶又荒谬。TryXXX方法的全部意义在于它们不应该抛出异常。我非常失望。
除了Brian Gideon在Update后面提到的警告之外,他的解决方案还存在以下性能问题:
-
O(n)对队列(
queue.Contains(item)
)的操作会随着队列的增长对性能产生严重影响 - 锁限制并发性(他确实提到了)
以下代码通过
改进了Brian的解决方案- 使用哈希集执行 0 (1)查找
- 结合
System.Collections.Concurrent
命名空间 的2个数据结构
注意:由于没有ConcurrentHashSet
,我使用ConcurrentDictionary
,忽略值。
public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
private readonly ConcurrentDictionary<T, bool> existingElements = new ConcurrentDictionary<T, bool>();
private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
public bool TryAdd(T item)
{
if (existingElements.TryAdd(item, false))
{
queue.Enqueue(item);
return true;
}
return false;
}
public bool TryTake(out T item)
{
if (queue.TryDequeue(out item))
{
bool _;
existingElements.TryRemove(item, out _);
return true;
}
return false;
}
...
}
注意:看待这个问题的另一种方式是:您希望是一个保留插入顺序的集合。
我建议使用lock实现您的操作,这样您就不会以一种破坏它的方式读写项目,使它们原子化。例如,对于任意IEnumerable:
object bcLocker = new object();
// ...
lock (bcLocker)
{
bool foundTheItem = false;
foreach (someClass nextItem in myBlockingColl)
{
if (nextItem.Equals(item))
{
foundTheItem = true;
break;
}
}
if (foundTheItem == false)
{
// Add here
}
}
如何访问阻塞集合的底层默认并发队列?
默认由ConcurrentQueue<T>
支持BlockingCollection<T>
。换句话说,如果您没有显式地指定它的后台存储,它将在幕后创建一个ConcurrentQueue<T>
。由于您希望直接访问底层存储,因此可以手动创建ConcurrentQueue<T>
并将其传递给BlockingCollection<T>
的构造函数:
ConcurrentQueue<Item> queue = new();
BlockingCollection<Item> collection = new(queue);
不幸的是,ConcurrentQueue<T>
集合没有带输入参数的TryPeek
方法,所以你想做的是不可能的:
if (!queue.TryPeek(item)) // Compile error, missing out keyword
collection.Add(item);
还要注意queue
现在由collection
拥有。如果您试图直接改变它(通过发出Enqueue
或TryDequeue
命令),collection
将抛出异常。