Parallel.ForEach在.NET6中具有优先级队列



我正试图在Priority Queue上运行Parallel.ForEach,但遇到以下错误:

严重性代码描述项目文件行禁止显示状态错误CS0411无法根据用法推断方法"Parallel.ForEach(OrderablePartitioner,ParallelOptions,Action<TSource,Parallel LoopState,long>("的类型参数。请尝试显式指定类型参数。TPL_POC.PL

我知道如何用IEnumerableList执行Parallel.ForEach,但以下内容不太好。

private void ProcessTasksParallely()
{
PriorityQueue<string, int> activeTasksPriority = new PriorityQueue<string, int>();
foreach (var task in this.tasks)
{
activeTasksPriority.Enqueue(task.Task, task.Id);
}
Console.WriteLine("Processing");
var options = new ParallelOptions { MaxDegreeOfParallelism = (Environment.ProcessorCount / 2) * 10 };
Parallel.ForEach(activeTasksPriority.TryDequeue(out string t, out int priority),
options,
(t, priority) =>
{
Console.WriteLine($" task {priority}, task = {t}, thread = {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(100);
});
}

我尝试这样做是因为我需要并行处理任务,但要根据它们的优先级进行处理。

PriorityQueue<TElement, TPriority>类没有提供将其作为开箱即用的IEnumerable使用的方法。它只有一个UnorderedItems属性,这不是您想要的。此属性在不消耗队列内容的情况下生成队列内容,并且不按特定顺序生成。不过,为PriorityQueue<TElement, TPriority>类实现自定义GetConsumingEnumerable方法很容易,如下所示:

/// <summary>
/// Gets an enumerable sequence that consumes the elements of the queue
/// in an ordered manner.
/// </summary>
public static IEnumerable<(TElement Element, TPriority Priority)>
GetConsumingEnumerable<TElement, TPriority>(
this PriorityQueue<TElement, TPriority> source)
{
while (source.TryDequeue(out TElement element, out TPriority priority))
{
yield return (element, priority);
}
}

用法示例:

var partitioner = Partitioner.Create(activeTasksPriority.GetConsumingEnumerable(),
EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, options, entry =>
{
var (t, priority) = entry;
Console.WriteLine($"Priority: {priority}, Task: {t}");
Thread.Sleep(100);
});

Partitioner.Create+NoBuffering的目的是防止Parallel.ForEach在准备处理元素之前预先消耗元素并将其存储到缓冲区中。

注意:此答案涉及问题中提出的简单场景,其中PriorityQueue<E,P>在开始并行循环之前已完全填充。如果您想在循环运行时在队列中添加更多项目,则不能直接使用PriorityQueue<E,P>,原因有两个:

  1. 这不是一个线程安全的集合
  2. 它没有阻塞功能,因此循环可能在处理所有项目之前提前完成

如果您正在处理这样一个场景,您可以看看这个问题:具有优先级的并发收集。

如果您想在发布/子场景中实现优先级,Parallel.ForEachPriorityQueue<T>都是糟糕的选择。

  • Parallel.ForEach是为数据并行性而构建的,它通过对大量内存中的数据进行分区来处理这些数据,并且每个核心大约使用一个工作任务来处理每个分区,而同步性最低。这里不需要PriorityQueue——如果您想要特定的订单,可以使用PLINQ和OrderBy强制执行
  • 优先级不可避免地会改变项目的感知顺序和队列状态,这对并发性来说是一个很大的禁忌
  • 优先级可能会颠倒。当新的高优先级项目正在等待时,所有工作任务可能都忙于处理低优先级项目。更糟糕的是,Parallel.ForEach使用的默认分区器缓冲项目。这意味着新的高优先级项目可能必须等待多个低优先级项目。您必须使用带有禁用缓冲选项的Partitioner.Create

在高吞吐量网络和消息传递中,优先级处理是通过多个队列而不是单个优先级队列执行的。优先级较高的队列获得更多的资源,或者在优先级较低的队列之前进行处理。

每个优先级类别一个队列

这就是高度可扩展的消息传递系统的工作方式,因为它不需要任何同步来确定下一步要处理的项目。

实现此策略的一种方法是使用多个ActionBlock实例,每个实例具有不同数量的工作任务:

async Task ProcessMessage(string msg) {...}
ExecutionDataflowBlockOptions WithDop(int dop)=>new ExecutionDataflowBlockOptions{ 
MaxDegreeOfParallelism = dop
};

void BuildQueues()
{ 
_highQueue=new ActionBlock<string>(ProcessMessage,WithDop(4));
_midQueue=new ActionBlock<string>(ProcessMessage,WithDop(2));
_lowQueue=new ActionBlock<string>(ProcessMessage,WithDop(1));
}
public void Process(string msg,int priority)
{
var queue= priority switch {
0 => _highQueue,
1 => _midQueue,
_ => _lowQueue
}
queue.Post(msg);    
}
async Task Complete()
{
_highQueue.Complete();
_midQueue.Complete();
_lowQueue.Complete();
await Task.WhenAll(
_hiqhQueue.Completion, 
_midQueue.Completion, 
_lowQueue.Completion
);
}

在这种情况下,Process使用模式匹配将消息路由到适当的ActionBlock

最新更新