没有BlockingCollection的简单生产者-消费者集合



我想编写一个简单的生产者-消费者队列,而不使用内置的System.Collections.Concurrent.BlockingCollection。下面是一个"似乎"有效的快速尝试。它在线程方面、竞争条件、死锁等方面有什么问题吗。?

class ProducerConsumerQueue<T>
{
Queue<T> Queue = new Queue<T>();
ManualResetEvent Event = new ManualResetEvent(false);
object Lock = new object();
public void Add(T t)
{
lock (Lock)
{
Queue.Enqueue(t);
}
Event.Set();
}
public bool TryTake(out T t, int timeout)
{
if (Event.WaitOne(timeout))
{
lock (Lock)
{
if (Queue.Count > 0)
{
t = Queue.Dequeue();
if (Queue.Count == 0) Event.Reset();
return true;
}
}
}
t = default(T);
return false;
}
}

Btw。我只需要AddTryTake两种方法,我不需要IEnumerable

Microsoft最近放弃了System.Threading.Channels,它旨在提供优化的生产者/消费者API,在这种情况下可能非常适合。它涵盖了无边界和有边界的场景,并包括单个和多个读取器/写入器场景。API使用起来非常简单和直观;唯一需要注意的是,它使用了面向async的API(对于消费者,对于有界频道,对于生产者(。

重点是:你不写的代码往往是痛点较少的代码,尤其是如果它是由一个对目标特定问题有专业知识和兴趣的团队编写的。


但是:您可以在当前代码中执行所有操作,而不需要C#中的ManualResetEvent-lock。它只是Monitor中最简单的部分的包装,但Monitor还提供等待/脉冲功能:

class ProducerConsumerQueue<T>
{
private readonly Queue<T> Queue = new Queue<T>();
public void Add(T t)
{
lock (Queue)
{
Queue.Enqueue(t);
if (Queue.Count == 1)
{
// wake up one sleeper
Monitor.Pulse(Queue);
}
}
}
public bool TryTake(out T t, int millisecondsTimeout)
{
lock (Queue)
{
if (Queue.Count == 0)
{
// try and wait for arrival
Monitor.Wait(Queue, millisecondsTimeout);
}
if (Queue.Count != 0)
{
t = Queue.Dequeue();
return true;
}
}
t = default(T);
return false;
}
}

我认为同时使用lockManualResetEvent是多余的。我建议您阅读更多关于ManualResetEvent的内容,了解如何在代码中进入和退出同步区域(您也可以查看System.Threading下提供的其他同步机制(

如果不仅仅是为了锻炼,您还可以看看NetMQ。

希望它能有所帮助!

根据我对问题的评论,

这是我提出的解决方案。

public class BlockingQueue<T>
{
// In order to get rid of Lock object
// Any thread should be able to add items to the queue
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
// Only one thread is able to consume from queue
// You can fine tune this to your interest
private readonly SemaphoreSlim _slim = new SemaphoreSlim(1,1);
public void Add(T item) {
_queue.Enqueue(item);
}
public bool TryTake(out T item, TimeSpan timeout) {
if (_slim.Wait(timeout)){
return _queue.TryDequeue(out item);
}
item = default(T);
return false;
}
}

最新更新