如何访问阻塞集合的底层默认并发队列



我有多个生产者和一个消费者。但是,如果队列中有一些东西尚未被消耗,则生产者不应该再次将其排队。(使用默认并发队列的唯一无重复阻塞集合)

if (!myBlockingColl.Contains(item))
    myBlockingColl.Add(item)

但是,阻塞集合没有包含方法,也没有提供任何类似TryPeek()的方法。如何访问底层并发队列,以便执行

之类的操作?
if (!myBlockingColl.myConcurQ.trypeek(item)
  myBlockingColl.Add(item)

在尾翼旋转?

这是个有趣的问题。这是我第一次看到有人要求阻塞队列忽略重复项。奇怪的是,我在BCL中找不到任何你想要的东西。我说这很奇怪,因为BlockingCollection可以接受IProducerConsumerCollection作为底层集合,该集合具有TryAdd方法,该方法被宣传为在检测到重复时能够失败。问题是,我没有看到IProducerConsumerCollection防止重复的具体实现。至少我们可以自己写。

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
  // TODO: You will need to fully implement IProducerConsumerCollection.
  private Queue<T> queue = new Queue<T>();
  public bool TryAdd(T item)
  {
    lock (queue)
    {
      if (!queue.Contains(item))
      {
        queue.Enqueue(item);
        return true;
      }
      return false;
    }
  }
  public bool TryTake(out T item)
  {
    lock (queue)
    {
      item = null;
      if (queue.Count > 0)
      {
        item = queue.Dequeue();
      }
      return item != null;
    }
  }
}

现在我们有了不接受重复的IProducerConsumerCollection,我们可以这样使用它:

public class Example
{
  private BlockingCollection<object> queue = new BlockingCollection<object>(new NoDuplicatesConcurrentQueue<object>());
  public Example()
  {
    new Thread(Consume).Start();
  }
  public void Produce(object item)
  {
    bool unique = queue.TryAdd(item);
  }
  private void Consume()
  {
    while (true)
    {
      object item = queue.Take();
    }
  }
}

你可能不喜欢我对NoDuplicatesConcurrentQueue的实现。如果您认为需要TPL集合提供的低锁性能,您当然可以自由地使用ConcurrentQueue或其他工具来实现自己的集合。

更新:

我今天早上能够测试代码。有一些好消息和坏消息。好消息是,这在技术上是可行的。坏消息是,您可能不希望这样做,因为BlockingCollection.TryAdd会拦截来自底层IProducerConsumerCollection.TryAdd方法的返回值,并在检测到false时抛出异常。是的,没错。它不像您期望的那样返回false,而是生成一个异常。老实说,这既令人惊讶又荒谬。TryXXX方法的全部意义在于它们不应该抛出异常。我非常失望。

除了Brian Gideon在Update后面提到的警告之外,他的解决方案还存在以下性能问题:

  • O(n)对队列(queue.Contains(item))的操作会随着队列的增长对性能产生严重影响
  • 锁限制并发性(他确实提到了)

以下代码通过

改进了Brian的解决方案
  • 使用哈希集执行 0 (1)查找
  • 结合System.Collections.Concurrent命名空间
  • 的2个数据结构

注意:由于没有ConcurrentHashSet,我使用ConcurrentDictionary,忽略值。

在这种罕见的情况下,幸运的是,可以简单地将多个更简单的并发数据结构组合成一个更复杂的并发数据结构,而无需添加锁。在这里,两个并发数据结构上的操作顺序很重要。
public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
    private readonly ConcurrentDictionary<T, bool> existingElements = new ConcurrentDictionary<T, bool>();
    private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    public bool TryAdd(T item)
    {
        if (existingElements.TryAdd(item, false))
        {
            queue.Enqueue(item);
            return true;
        }
        return false;
    }
    public bool TryTake(out T item)
    {
        if (queue.TryDequeue(out item))
        {
            bool _;
            existingElements.TryRemove(item, out _);
            return true;
        }
        return false;
    }
    ...
}

注意:看待这个问题的另一种方式是:您希望是一个保留插入顺序的集合

我建议使用lock实现您的操作,这样您就不会以一种破坏它的方式读写项目,使它们原子化。例如,对于任意IEnumerable:

object bcLocker = new object();
// ...
lock (bcLocker)
{
    bool foundTheItem = false;
    foreach (someClass nextItem in myBlockingColl)
    {
        if (nextItem.Equals(item))
        {
            foundTheItem = true;
            break;
        }
    }
    if (foundTheItem == false)
    {
        // Add here
    }
}

如何访问阻塞集合的底层默认并发队列?

默认由ConcurrentQueue<T>支持BlockingCollection<T>。换句话说,如果您没有显式地指定它的后台存储,它将在幕后创建一个ConcurrentQueue<T>。由于您希望直接访问底层存储,因此可以手动创建ConcurrentQueue<T>并将其传递给BlockingCollection<T>的构造函数:

ConcurrentQueue<Item> queue = new();
BlockingCollection<Item> collection = new(queue);

不幸的是,ConcurrentQueue<T>集合没有带输入参数的TryPeek方法,所以你想做的是不可能的:

if (!queue.TryPeek(item)) // Compile error, missing out keyword
    collection.Add(item);

还要注意queue现在由collection拥有。如果您试图直接改变它(通过发出EnqueueTryDequeue命令),collection将抛出异常。

相关内容

  • 没有找到相关文章

最新更新