使用并行.ForEach可以加速文件的处理,但不能以正确的顺序返回



所以我试图使用Parallel.ForEach循环来加快我对文件的处理,但我不知道如何使它以有序的方式构建输出。这是我到目前为止的代码:

string[] lines = File.ReadAllLines(fileName);
List<string> list_lines = new List<string>(lines);
Parallel.ForEach(list_lines, async line =>
{
    processedData += await processSingleLine(line);
});

正如你所看到的,它没有任何有序的实现,因为我试图寻找适合我的解决方案的东西,我还没有找到任何我能够接近工作的东西。
所以最好是我想处理每行,但是按照每行发送的顺序建立processedData变量,但是我确实意识到这可能只是超出了我目前的技能水平,所以任何建议都会很好。

编辑:在阅读了下面的答案后,我尝试了两种方法:

ConcurrentDictionary<int, string> result = new ConcurrentDictionary<int, string>();
Parallel.For(0, list.Length, i =>
{
    // process your data and save to dict
    result[i] = processData(lines[i]);
});

ConcurrentDictionary<int, string> result = new ConcurrentDictionary<int, string>();
for (var i = 0; i < lines.Length; i++)
{
    result[i] = lines[i];
}
Array.Clear(lines,0, lines.Length);
Parallel.ForEach(result, line =>
{
    result[line.Key] = encrypt(line.Value, key);
});

然而,两者似乎只使用了大约1核(4核处理器),占任务管理器总数的30%,而在我实现排序之前,它在CPU上使用了近80%。

您可以尝试使用Parallel.For代替Parallel.ForEach。然后就有了线的索引。例如:

string[] lines = File.ReadAllLines(fileName);
// use thread safe collection for catching the results in parallel
ConcurrentDictionary<int, Data> result = new ConcurrentDictionary<int, Data>();
Parallel.For(0, list.Length, i =>
{
    // process your data and save to dict
    result[i] = processData(lines[i]);
});
// having data in dict you can easily retrieve initial order
Data[] orderedData = Data[lines.Length];
for(var i=0; i<lines.Length; i++)
{
    orderedData[i] = result[i];
}

EDIT:正如你的问题下的评论所说,你不能在这里使用异步方法。当您这样做时,Parallel.ForEach将返回一堆任务,而不是结果。如果想并行化异步代码,可以使用多个Task.Run,如下所示:

string[] lines = File.ReadAllLines(fileName);
var tasks = lines.Select(
                 l => Task.Run<Data>(
                         async () => {
                              return await processAsync(l);
                         })).ToList();
var results = await Task.WhenAll(tasks);

注意:应该可以工作,但是没有检查

我相信Parallel.ForEach.AsOrdered()可以满足您的需求。

从代码中取出数据结构list_lines和方法processSingleLine,下面的代码应该保持顺序并并行执行:

var parallelQuery = from line in list_lines.AsParallel().AsOrdered()
                    select processSingleLine(line);
foreach (var processedLine in parallelQuery)
{
    Console.Write(processedLine);
}

最新更新