我按照本教程创建一个托管服务。程序按预期运行。但是,我想并发地处理队列项。
在我的应用程序中,有4个客户端,每个客户端一次可以处理4个项目。因此,在任何给定时间,应该并行处理16个项目。
因此,根据这些要求,我对代码进行了一些修改: 在MonitorLoop类中:private int count = 0;
private async ValueTask MonitorAsync()
{
while (!_cancellationToken.IsCancellationRequested)
{
await _taskQueue.QueueAsync(BuildWorkItem);
Interlocked.Increment(ref count);
Console.WriteLine($"Count: {count}");
}
}
和在同一个类中:
if (delayLoop == 3)
{
_logger.LogInformation("Queued Background Task {Guid} is complete.", guid);
Interlocked.Decrement(ref count);
}
这表明,如果我设置"Capacity"作为4,该值在5之后不会增加。基本上,如果队列已满,它将等待,直到有空间再容纳一个。
问题是每次只处理一个项。
下面是QueuedHostedService
类上BackgroundProcessing
方法的代码:
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await TaskQueue.DequeueAsync(stoppingToken);
try
{
//instead of getting a single item from the queue, somehow, here
//we should be able to process them in parallel for 4 clients
//with a limit for maximum items each client can process
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
我想并行处理它们。我不确定是否使用Channel
作为系统中的队列是最好的解决方案。也许我应该换成ConcurrentQueue
。但是,我不确定如何实现一个健壮的实现,可以有4个客户端,每个客户端有4个线程。
如果您想要四个处理器,那么您可以重构代码,使用主循环的四个实例,并使用Task.WhenAll
(异步)等待它们全部完成:
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
var task1 = ProcessAsync(stoppingToken);
var task2 = ProcessAsync(stoppingToken);
var task3 = ProcessAsync(stoppingToken);
var task4 = ProcessAsync(stoppingToken);
await Task.WhenAll(task1, task2, task3, task4);
async Task ProcessAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await TaskQueue.DequeueAsync(stoppingToken);
try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
}
如果你想要一个健壮的实现,那么你不能使用那个教程,抱歉。这种后台工作的主要问题是,它会在任何应用重启时丢失。应用程序重启是正常的:服务器可能会断电或崩溃,可以安装操作系统或运行时补丁,IIS会定期回收你的应用程序,每当你部署代码时,应用程序都会重启。当这些事情发生时,所有内存中的队列(如通道)将失去它们所有的工作。我不确定如何实现健壮的实现
生产质量的实现至少需要一个持久队列。我还建议使用单独的后台处理器。我有一个关于这个主题的博客系列,可以帮助你入门。