我需要实现一种任务缓冲区。基本要求是:
- 在单个后台线程中处理任务
- 从多个线程接收任务
- 处理所有接收到的任务,即确保在接收到停止信号后,缓冲区中的缓冲任务被耗尽
- 必须保持每个线程接收的任务的顺序
我正在考虑使用下面这样的队列来实现它。感谢对实施情况的反馈。有没有其他更聪明的想法来实现这样的事情?
public class TestBuffer
{
private readonly object queueLock = new object();
private Queue<Task> queue = new Queue<Task>();
private bool running = false;
public TestBuffer()
{
}
public void start()
{
Thread t = new Thread(new ThreadStart(run));
t.Start();
}
private void run()
{
running = true;
bool run = true;
while(run)
{
Task task = null;
// Lock queue before doing anything
lock (queueLock)
{
// If the queue is currently empty and it is still running
// we need to wait until we're told something changed
if (queue.Count == 0 && running)
{
Monitor.Wait(queueLock);
}
// Check there is something in the queue
// Note - there might not be anything in the queue if we were waiting for something to change and the queue was stopped
if (queue.Count > 0)
{
task = queue.Dequeue();
}
}
// If something was dequeued, handle it
if (task != null)
{
handle(task);
}
// Lock the queue again and check whether we need to run again
// Note - Make sure we drain the queue even if we are told to stop before it is emtpy
lock (queueLock)
{
run = queue.Count > 0 || running;
}
}
}
public void enqueue(Task toEnqueue)
{
lock (queueLock)
{
queue.Enqueue(toEnqueue);
Monitor.PulseAll(queueLock);
}
}
public void stop()
{
lock (queueLock)
{
running = false;
Monitor.PulseAll(queueLock);
}
}
public void handle(Task dequeued)
{
dequeued.execute();
}
}
您实际上可以使用开箱即用的BlockingCollection来处理此问题。
它被设计为有一个或多个生产者和一个或更多消费者。在您的情况下,您将有多个生产商和一个消费者。
当你收到停止信号时,让信号处理器
- 信号发生器线程停止
- 在BlockingCollection实例上调用CompleteAdding
使用者线程将继续运行,直到所有排队的项目都被删除和处理,然后它将遇到BlockingCollection完成的条件。当线程遇到这种情况时,它就会退出。
您应该考虑ConcurrentQueue,它实际上是FIFO。如果不合适,请尝试Thread Safe Collections中的一些同类产品。使用这些可以避免一些风险。
我建议您查看一下TPL数据流。BufferBlock是您想要的,但它提供了更多。
看看我的线程安全FIFO队列的轻量级实现,它是一个使用线程池的非阻塞同步工具,在大多数情况下比创建自己的线程要好,也比使用阻塞同步工具作为锁和互斥锁要好。https://github.com/Gentlee/SerialQueue
用法:
var queue = new SerialQueue();
var result = await queue.Enqueue(() => /* code to synchronize */);
您可以在上使用Rx。NET 3.5。它可能从来没有从RC中出来过,但我相信它是稳定的*,并在许多生产系统中使用。如果您不需要Subject,您可能会为找到基元(如并发集合)。NET 3.5,您可以使用并没有附带的。NET Framework 4.0之前。
用于.net 3.5 的Rx(反应式扩展)的替代方案
*-Nit picker的角落:除了可能的高级时间窗口,这超出了范围,但缓冲区(按计数和时间)、排序和调度器都是稳定的。