具有固定大小FIFO队列的生产者/消费者模式



我需要围绕固定大小的FIFO队列实现生产者/消费者模式。我认为围绕ConcurrentQueue的包装类可能适用于此,但我不完全确定(而且我以前从未使用过ConcurrentQueue)。其中的转折点是,队列只需要容纳固定数量的项目(在我的情况下是字符串)。我的应用程序将有一个生产者任务/线程和一个消费者任务/线程。当我的使用者任务运行时,它需要及时将队列中存在的所有项目排成队列并进行处理。

值得一提的是,我的消费者对排队项目的处理只不过是通过SOAP将它们上传到一个并非100%可靠的web应用程序。如果无法建立连接或调用SOAP调用失败,我应该丢弃这些项,然后返回队列获取更多信息。由于SOAP的开销,我试图最大限度地增加队列中可以在一个SOAP调用中发送的项目数。

有时,我的生产者添加商品的速度可能比我的消费者删除和处理商品的速度更快。如果队列已经满了,而我的生产者需要添加另一个项目,我需要将新项目排入队列,然后将最旧的项目退出队列,以便队列的大小保持不变。基本上,我需要始终将生产的最新商品保留在队列中(即使这意味着由于我的消费者当前正在处理以前的商品,某些商品没有被消费)。

关于生产者保持队列中项目的数量固定,我从这个问题中发现了一个潜在的想法:

固定大小的队列,在新队列时自动将旧值排成队列

我目前正在使用一个包装类(基于这个答案)来围绕ConcurrentQueue和Enqueue()方法,如下所示:

public class FixedSizeQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    public int Size { get; private set; }
    public FixedSizeQueue(int size)
    {
        Size = size;
    }
    public void Enqueue(T obj)
    {
        // add item to the queue
        queue.Enqueue(obj);
        lock (this) // lock queue so that queue.Count is reliable
        {
            while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
            {
                T objOut;
                queue.TryDequeue(out objOut);
            }
        }
    }
}

我创建了一个这个类的实例,在队列上有一个大小限制,如下所示:

FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit

我启动我的生产者任务,它开始填充队列。当添加项目导致队列计数超过最大大小时,我的Enqueue()方法中的代码似乎可以正确地从队列中删除最旧的项目。现在,我需要我的消费者任务来排队处理商品,但这是我大脑感到困惑的地方。为我的消费者实现出列方法的最佳方式是什么?该方法将在某个时刻拍摄队列的快照,并将所有项目出列以进行处理(生产商在此过程中可能仍在向队列中添加项目)?

简单地说,ConcurrentQueue有一个"ToArray"方法,当输入该方法时,将锁定集合并生成队列中所有当前项的"快照"。如果你想让你的消费者得到一块要处理的东西,你可以锁定入队方法的同一个对象,调用ToArray(),然后旋转一个while(!queue.IsEmpty) queue.TryDequeue(out trash)循环来清除队列,然后返回提取的数组。

这将是您的GetAll()方法:

public T[] GetAll()
{
    lock (syncObj) // so that we don't clear items we didn't get with ToArray()
    {
        var result = queue.ToArray();
        T trash;
        while(!queue.IsEmpty) queue.TryDequeue(out trash);
    }
}

由于您必须清除队列,因此可以简单地将这两种操作组合起来;创建一个适当大小的数组(使用queue.Count),然后在队列不为空的情况下,将一个项目取消队列并放入数组中,然后返回。

现在,这就是具体问题的答案。我现在必须凭良心戴上CodeReview.SE帽子,指出几件事:

  • 切勿使用lock(this)。您永远不知道其他哪些对象可能正在使用您的对象作为锁定焦点,因此当对象从内部锁定时会被阻挡。最佳实践是锁定私有范围的对象实例,通常是为锁定而创建的对象实例:private readonly object syncObj = new object();

  • 由于您无论如何都要锁定包装器的关键部分,所以我将使用普通的List<T>而不是并发集合。访问速度更快,更容易清理,因此您将能够比ConcurrentQueue允许的更简单地完成您正在做的事情。若要入队,请锁定同步对象,在索引0之前插入(),然后使用RemoveRange()从索引大小到列表的当前计数删除任何项目。若要出列,请锁定同一同步对象,调用myList.ToArray()(来自Linq命名空间;与ConcurrentQueue的作用几乎相同),然后在返回数组之前调用myList.Clear()。再简单不过了:

    public class FixedSizeQueue<T>
    {
    private readonly List<T> queue = new List<T>();
    private readonly object syncObj = new object();
    public int Size { get; private set; }
    public FixedSizeQueue(int size) { Size = size; }
    public void Enqueue(T obj)
    {
        lock (syncObj)
        {
            queue.Insert(0,obj)
            if(queue.Count > Size) 
               queue.RemoveRange(Size, Count-Size);
        }
    }
    public T[] Dequeue()
    {
        lock (syncObj)
        {
            var result = queue.ToArray();
            queue.Clear();
            return result;
        }
    }
    }
    
  • 您似乎明白使用此模型是在丢弃排队的项目。这通常不是一件好事,但我愿意给你怀疑的好处。然而,我要说的是,有一种无损的方法可以实现这一点,使用BlockingCollection。BlockingCollection包装任何IProducerConsumerCollection,包括大多数System.Collection.Concurrent类,并允许您指定队列的最大容量。然后,集合将阻止任何试图从空队列出列的线程,或任何试图添加到完整队列的线程,直到添加或删除了项目,从而有了要获取的内容或插入的空间。这是实现具有最大大小的生产者-消费者队列的最佳方式,或者需要"轮询"来查看消费者是否有什么需要处理的队列。如果你走这条路,只有消费者必须丢弃的队列才会被丢弃;消费者将看到生产者放入的所有行,并对每一行做出自己的决定。

您不希望将lockthis一起使用。请参阅为什么锁(this){…}坏?了解更多详细信息。

此代码

// if queue count > max queue size, then dequeue an item
while (queue.Count > Size) 
{
    T objOut;
    queue.TryDequeue(out objOut);
}

建议您需要以某种方式等待通知消费者该商品的可用性。在这种情况下,请考虑使用BlockingCollection<T>

最新更新