NET Core请求,如何将任务添加到后台工作队列并等待结果



我有一个worker服务,它将使用Cv2 Dnn执行图像处理。它被优化为使用所有可用的CPU,并且运行它的多个实例会降低性能。虽然它运行得非常快,但如果同时添加两个图像,可能会导致Dnn崩溃。

出于这些原因,我想有一个后台工作人员从队列顺序读取。所有传入的请求都将它们的图像添加到这个队列中,并等待结果。

我希望实现的工作流程:

  1. 从API控制器接收图像,将图像推送到队列并等待
  2. 后台工作程序依次从队列中读取图像,处理图像,返回结果
  3. 控制器接收处理结果并返回图像

到目前为止,我已经看到了许多将项目推入队列以进行处理的方法,基本上是作为"立即处理",但没有关于如何等待项目完成处理的方法。例如,在Microsoft关于使用托管服务的后台任务的文档中,他们的队列后台任务示例是简单地将一些任务推送到队列中,然后一个服务在添加任务时读取任务,但调用代码不等待任务完成

你可以使用taskcompetitionsource。考虑这个简单的后台服务,只有一个后台线程:

using System;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;

public class BackgroundService
{
BlockingCollection<ImageQueueItem> _imagesToProcess = new();


public BackgroundService()
{
var thread = new Thread(ProcessImagesThread);
thread.Start();
}

public Task<ImageProcessingResult> ProcessImageQueuedAsync(Image image)
{
var taskCompetitionSource = new TaskCompletionSource<ImageProcessingResult>();
var queueItem = new ImageQueueItem
{
ImageToProcess = image,
ProcessingResult = taskCompetitionSource,
};
_imagesToProcess.Add(queueItem);
return taskCompetitionSource.Task;
}


// background thread that processes images
private void ProcessImagesThread()
{
while(true)
{
var queueItem =_imagesToProcess.Take();
var image = queueItem.ImageToProcess;
var result = new ImageProcessingResult();

try
{
// .............................................................
// process image here and set some properties to result variable
// .............................................................
// complete task by setting result
queueItem.ProcessingResult.SetResult(result);
}
catch(Exception e)
{
// or fail task if processing failed
queueItem.ProcessingResult.SetException(e);
}

}
}
}
// Some data about image to process
public class Image{}
// Some data to return from controller after processing image
public class ImageProcessingResult{}
public class ImageQueueItem
{
public Image ImageToProcess { get; set; }
public TaskCompletionSource<ImageProcessingResult> ProcessingResult { get; set; }
}

这个服务可以这样使用:

// awaiting this will wait until Image is processed
var result = await backgroundService.ProcessImageQueuedAsync()