我有一个可以由多个线程(ConcurrentQueue<MyJob>
)填充的作业队列。我需要实现这个工作异步的连续执行(不是主线程),但只有一个线程在同一时间。我试过这样做:
public class ConcurrentLoop {
private static ConcurrentQueue<MyJob> _concurrentQueue = new ConcurrentQueue<MyJob>();
private static Task _currentTask;
private static object _lock = new object();
public static void QueueJob(Job job)
{
_concurrentQueue.Enqueue(job);
checkLoop();
}
private static void checkLoop()
{
if ( _currentTask == null || _currentTask.IsCompleted )
{
lock (_lock)
{
if ( _currentTask == null || _currentTask.IsCompleted )
{
_currentTask = Task.Run(() =>
{
MyJob current;
while( _concurrentQueue.TryDequeue( out current ) )
//Do something
});
}
}
}
}
}
这段代码在我看来有一个问题:如果任务完成执行(TryDequeue
返回false,但任务尚未被标记为完成),在这一刻,我得到一个新的工作,它将不会被执行。我说的对吗?如果是,如何修复
您的问题陈述看起来像一个生产者-消费者问题,并警告您只需要一个消费者。
不需要手动重新实现这些功能。相反,我建议使用BlockingCollection
——它在内部使用ConcurrentQueue
和一个单独的线程进行消费。请注意,这可能适合也可能不适合您的用例。
类似:
_blockingCollection = new BlockingCollection<your type>(); // you may want to create bounded or unbounded collection
_consumingThread = new Thread(() =>
{
foreach (var workItem in _blockingCollection.GetConsumingEnumerable()) // blocks when there is no more work to do, continues whenever a new item is added.
{
// do work with workItem
}
});
_consumingThread.Start();
多个生产者(任务或线程)可以毫无问题地将工作项添加到_blockingCollection
中,并且无需担心同步生产者/消费者。
当你完成生产任务时,调用_blockingCollection.CompleteAdding()
(这个方法不是线程安全的,所以建议事先停止所有的生产者)。也许,您还应该在某处执行_consumingThread.Join()
来终止您的消费线程。
我会使用微软的响应式框架团队的响应式扩展(NuGet "System.Reactive")。这是一个可爱的抽象。
public class ConcurrentLoop
{
private static Subject<MyJob> _jobs = new Subject<MyJob>();
private static IDisposable _subscription =
_jobs
.Synchronize()
.ObserveOn(Scheduler.Default)
.Subscribe(job =>
{
//Do something
});
public static void QueueJob(MyJob job)
{
_jobs.OnNext(job);
}
}
这很好地将所有传入的作业同步到单个流中,并将执行推到Scheduler.Default
(基本上是线程池),但由于它已经序列化了所有输入,一次只能发生一个。这样做的好处是,如果两个值之间存在明显的差距,它将释放线程。这是一个非常精简的解决方案。
你只需要调用_jobs.OnCompleted();
或_subscription.Dispose();
。