取消处于阻塞状态的任务的最佳方法是什么?



我有任务正在运行,调用从RabbitMQ读取的方法。当队列中没有任何内容时,该方法就会阻塞。所以任务处于"运行"状态,但实际上并没有做任何事情。有什么方法可以优雅地结束这些任务吗?

访问队列的代码如下:

 private void FindWork(CancellationToken ct)
    {
        if (ct.IsCancellationRequested)
            return;
        bool result = false;
        bool process = false;
        bool queueResult = false;
        Work_Work work = null;
        try
        {
            using (Queue workQueue = new Queue(_workQueue))
            {
                // Look for work on the work queue
                workQueue.Open(Queue.Mode.Consume);
                work = workQueue.ConsumeWithBlocking<Work_Work>();
                // Do some work with the message ...
                return;

按如下方式创建任务:

private void Run()
    {
        while (!_stop)
        {
            // Remove and stopped tasks from the pool
            List<int> removeThreads = new List<int>();
            lock (_tasks)
            {
                foreach (KeyValuePair<int, Task> task in _tasks)
                {
                    if (task.Value.Status != TaskStatus.Running)
                    {
                        task.Value.Wait();
                        removeThreads.Add(task.Value.Id);
                    }
                }
                foreach (int taskID in removeThreads)
                    _tasks.Remove(taskID);
            }
            CancellationToken ct = _cts.Token;
            TaskFactory factory = new TaskFactory(ct, TaskCreationOptions.LongRunning, TaskContinuationOptions.LongRunning, null);
            // Create new tasks if we have room in the pool
            while (_tasks.Count < _runningMax)
            {
                Task task = factory.StartNew(() => FindWork(ct));
                lock (_tasks)
                    _tasks.Add(task.Id, task);
            }
            // Take a rest so we don't run the CPU to death
            Thread.Sleep(1000);
        }
    }

目前,我已将任务创建代码更改为如下所示,以便我可以中止任务。我知道这不是一个好办法,但我不知道还能做什么。

while (_tasks.Count < _runningMax)
            {
                Task task = factory.StartNew(() =>
                    {
                        try
                        {
                            using (_cts.Token.Register(Thread.CurrentThread.Abort))
                            {
                                FindWork(ct);
                            }
                        }
                        catch (ThreadAbortException)
                        {
                            return;
                        }
                    }, _cts.Token);
                _tasks.Add(task.Id, task);
            }

可以在您的场景中工作吗?

比起生成多个线程并让它们在队列中等待,我会在无限轮询循环中拥有单个线程,并在新工作进入时让该线程生成新线程。您可以添加一个信号量来限制创建的线程数量。检查下面的示例代码,我使用了一个BlockingCollection而不是RabbitMQ。
  public class QueueManager
    {
        public BlockingCollection<Work> blockingCollection = new BlockingCollection<Work>();
        private const int _maxRunningTasks = 3;
        static SemaphoreSlim _sem = new SemaphoreSlim(_maxRunningTasks);
        public void Queue()
        {
            blockingCollection.Add(new Work());
        }
        public void Consume()
        {
            while (true)
            {
                Work work = blockingCollection.Take();
                _sem.Wait();
                Task t = Task.Factory.StartNew(work.DoWork);
            }
        }
        public class Work
        {
            public void DoWork()
            {
                Thread.Sleep(5000);
                _sem.Release();
                Console.WriteLine("Finished work");
            }
        }
    }

和我的测试类

class Test
    {
        static void Main(string[] args)
        {
            Consumer c = new Consumer();
            Task t = Task.Factory.StartNew(c.Consume);
            c.Queue();
            c.Queue();
            c.Queue();
            c.Queue();
            c.Queue();
            Thread.Sleep(1000);
            Console.ReadLine();
        }
    }

要做到这一点,您需要更改ConsumeWithBlocking以支持取消。我不熟悉RabbitMQ,但显然它支持在消费者通道上取消。

所以,而不是做Thread.CurrentThread.AbortToken.Register回调,做正确的事情,并通过适当的RabbitMQ API优雅地取消操作。

在一个侧面说明,你目前试图中止的线程很可能不是被ConsumeWithBlocking阻塞的线程。

相关内容

  • 没有找到相关文章

最新更新