通道的并发处理



我按照本教程创建一个托管服务。程序按预期运行。但是,我想并发地处理队列项。

在我的应用程序中,有4个客户端,每个客户端一次可以处理4个项目。因此,在任何给定时间,应该并行处理16个项目。

因此,根据这些要求,我对代码进行了一些修改: 在MonitorLoop类中:
private int count = 0;
private async ValueTask MonitorAsync()
{
while (!_cancellationToken.IsCancellationRequested)
{
await _taskQueue.QueueAsync(BuildWorkItem);
Interlocked.Increment(ref count);
Console.WriteLine($"Count: {count}");
}
}

和在同一个类中:

if (delayLoop == 3)
{
_logger.LogInformation("Queued Background Task {Guid} is complete.", guid);
Interlocked.Decrement(ref count);
}

这表明,如果我设置"Capacity"作为4,该值在5之后不会增加。基本上,如果队列已满,它将等待,直到有空间再容纳一个。

问题是每次只处理一个项。

下面是QueuedHostedService类上BackgroundProcessing方法的代码:

private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await TaskQueue.DequeueAsync(stoppingToken);
try
{
//instead of getting a single item from the queue, somehow, here
//we should be able to process them in parallel for 4 clients
//with a limit for maximum items each client can process
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}

我想并行处理它们。我不确定是否使用Channel作为系统中的队列是最好的解决方案。也许我应该换成ConcurrentQueue。但是,我不确定如何实现一个健壮的实现,可以有4个客户端,每个客户端有4个线程。

如果您想要四个处理器,那么您可以重构代码,使用主循环的四个实例,并使用Task.WhenAll(异步)等待它们全部完成:

private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
var task1 = ProcessAsync(stoppingToken);
var task2 = ProcessAsync(stoppingToken);
var task3 = ProcessAsync(stoppingToken);
var task4 = ProcessAsync(stoppingToken);
await Task.WhenAll(task1, task2, task3, task4);
async Task ProcessAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await TaskQueue.DequeueAsync(stoppingToken);
try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
}

我不确定如何实现健壮的实现

如果你想要一个健壮的实现,那么你不能使用那个教程,抱歉。这种后台工作的主要问题是,它会在任何应用重启时丢失。应用程序重启是正常的:服务器可能会断电或崩溃,可以安装操作系统或运行时补丁,IIS会定期回收你的应用程序,每当你部署代码时,应用程序都会重启。当这些事情发生时,所有内存中的队列(如通道)将失去它们所有的工作。

生产质量的实现至少需要一个持久队列。我还建议使用单独的后台处理器。我有一个关于这个主题的博客系列,可以帮助你入门。

相关内容

  • 没有找到相关文章

最新更新