.NET 6.并行处理,当运行的线程少于MaxDegreesOfParallelism时,重新填充队列



我想使用BackgroundService并行处理一些数据。

数据库中可能有超过MaxDegreesOfParallelism个项目要处理,但由于每行处理可能需要几分钟的时间,我希望只将实际可以处理的项目加载到本地处理队列中,这样我就可以避免潜在的过时数据。

例如,如果数据库中有20个项目要处理,而MaxDegreesOfParallelism(允许的并发处理作业数(为10,则我只想加载10个项目。几分钟后,可以对前3个项目进行处理。现在,我想从数据库中加载接下来的3个项目。当然,当程序启动时,我可以加载所有20个项目,但当";处理时隙";则这20个项目可能不再反映数据库的当前状态。

我想我可以用ConcurrentQueue<T>来做这件事——但我怎么知道什么时候";处理时隙";是否可用?这不一定是立即的,如果";数据加载";服务/线程可以每隔一分钟左右检查多少个"线程";处理时隙";并且加载要添加到处理队列的相应数量的项目。

或者我应该只使用Task.WhenAny,这样我就可以在处理完第一个项目后立即加载其他项目?但我仍然需要知道有多少";槽";可用。(但我可以用数据库来跟踪,嗯…(

我假设您使用的是TPL数据流。您可以使用具有BoundedCCapacity的ActionBlock来限制排队的项目数。

ActionBlock将等待SendAsync调用。当缓冲区填满时,SendAsync将被阻止(防止提取其他记录,直到记录完成处理,允许SendAsync推送下一条记录。(

Action<int> fn = record => {
Thread.Sleep(1000); // record processing task goes here.
Console.WriteLine(record);
};
var opts = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 2 };
// Sets the block's buffer size to one message
var actionBlock = new ActionBlock<int>(fn, opts);
while (true /* add your exit condition here */) {
var record = GetFromDB(index++);
await actionBlock.SendAsync(record);  
// After first 5 records, the bounded capacity will prevent additional records from being fetched until one of the existing records gets processed.
}

最新更新