作业的多线程队列



我有一个可以由多个线程(ConcurrentQueue<MyJob>)填充的作业队列。我需要实现这个工作异步的连续执行(不是主线程),但只有一个线程在同一时间。我试过这样做:

public class ConcurrentLoop {
    private static ConcurrentQueue<MyJob> _concurrentQueue = new ConcurrentQueue<MyJob>();
    private static Task _currentTask;
    private static object _lock = new object();
    public static void QueueJob(Job job)
    {
        _concurrentQueue.Enqueue(job);
        checkLoop();
    }
    private static void checkLoop()
    {
        if ( _currentTask == null || _currentTask.IsCompleted )
        {
            lock (_lock)
            {
                if ( _currentTask == null || _currentTask.IsCompleted )
                {
                    _currentTask = Task.Run(() =>
                    {
                            MyJob current;
                            while( _concurrentQueue.TryDequeue( out current ) ) 
                                //Do something                                                       
                    });
                }
            }
        }
    }
}

这段代码在我看来有一个问题:如果任务完成执行(TryDequeue返回false,但任务尚未被标记为完成),在这一刻,我得到一个新的工作,它将不会被执行。我说的对吗?如果是,如何修复

您的问题陈述看起来像一个生产者-消费者问题,并警告您只需要一个消费者。

不需要手动重新实现这些功能。相反,我建议使用BlockingCollection——它在内部使用ConcurrentQueue和一个单独的线程进行消费。请注意,这可能适合也可能不适合您的用例。

类似:

_blockingCollection = new BlockingCollection<your type>(); // you may want to create bounded or unbounded collection
_consumingThread = new Thread(() =>
{
    foreach (var workItem in _blockingCollection.GetConsumingEnumerable()) // blocks when there is no more work to do, continues whenever a new item is added.
    {
      // do work with workItem
     }
});
_consumingThread.Start();

多个生产者(任务或线程)可以毫无问题地将工作项添加到_blockingCollection中,并且无需担心同步生产者/消费者。

当你完成生产任务时,调用_blockingCollection.CompleteAdding()(这个方法不是线程安全的,所以建议事先停止所有的生产者)。也许,您还应该在某处执行_consumingThread.Join()来终止您的消费线程。

我会使用微软的响应式框架团队的响应式扩展(NuGet "System.Reactive")。这是一个可爱的抽象。

public class ConcurrentLoop
{
    private static Subject<MyJob> _jobs = new Subject<MyJob>();
    private static IDisposable _subscription =
        _jobs
            .Synchronize()
            .ObserveOn(Scheduler.Default)
            .Subscribe(job =>
            {
                //Do something
            });
    public static void QueueJob(MyJob job)
    {
        _jobs.OnNext(job);
    }
}

这很好地将所有传入的作业同步到单个流中,并将执行推到Scheduler.Default(基本上是线程池),但由于它已经序列化了所有输入,一次只能发生一个。这样做的好处是,如果两个值之间存在明显的差距,它将释放线程。这是一个非常精简的解决方案。

你只需要调用_jobs.OnCompleted();_subscription.Dispose();

相关内容

  • 没有找到相关文章

最新更新