这个问题是如何在一个简单的TPL数据流管道中优化性能的后续问题?
源代码在这里- https://github.com/MarkKharitonov/LearningTPLDataFlow
给定:
- 多个解决方案,涵盖约400个c#项目,包含数千个c#源文件,总计超过10,000,000行代码。
- 包含字符串字面值的文件,每行一个。
我想生成一个JSON文件,列出源代码中出现的所有字面值。对于每一个匹配的行,我希望有以下信息:
- 项目路径 c#文件路径
- 匹配行本身
- 匹配行号
并且所有记录按照各自的字面值键值排列成一个字典。
因此,挑战在于尽可能高效地完成它(当然是在c#中)。
数据流管道可以在这个文件中找到- https://github.com/MarkKharitonov/LearningTPLDataFlow/blob/master/FindStringCmd.cs
在这里:
private void Run(string workspaceRoot, string outFilePath, string[] literals, bool searchAllFiles, int workSize, int maxDOP1, int maxDOP2, int maxDOP3, int maxDOP4)
{
var res = new SortedDictionary<string, List<MatchingLine>>();
var projects = (workspaceRoot + "build\projects.yml").YieldAllProjects();
var progress = new Progress();
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, Environment.ProcessorCount);
var produceCSFiles = new TransformManyBlock<ProjectEx, CSFile>(p => YieldCSFiles(p, searchAllFiles), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDOP1
});
var produceCSFileContent = new TransformBlock<CSFile, CSFile>(CSFile.PopulateContentAsync, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDOP2
});
var produceWorkItems = new TransformManyBlock<CSFile, (CSFile CSFile, int Pos, int Length)>(csFile => csFile.YieldWorkItems(literals, workSize, progress), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDOP3,
TaskScheduler = taskSchedulerPair.ConcurrentScheduler
});
var produceMatchingLines = new TransformManyBlock<(CSFile CSFile, int Pos, int Length), MatchingLine>(o => o.CSFile.YieldMatchingLines(literals, o.Pos, o.Length, progress), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDOP4,
TaskScheduler = taskSchedulerPair.ConcurrentScheduler
});
var getMatchingLines = new ActionBlock<MatchingLine>(o => AddResult(res, o));
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
produceCSFiles.LinkTo(produceCSFileContent, linkOptions);
produceCSFileContent.LinkTo(produceWorkItems, linkOptions);
produceWorkItems.LinkTo(produceMatchingLines, linkOptions);
produceMatchingLines.LinkTo(getMatchingLines, linkOptions);
var progressTask = Task.Factory.StartNew(() =>
{
var delay = literals.Length < 10 ? 1000 : 10000;
for (; ; )
{
var current = Interlocked.Read(ref progress.Current);
var total = Interlocked.Read(ref progress.Total);
Console.Write("Total = {0:n0}, Current = {1:n0}, Percents = {2:P} r", total, current, ((double)current) / total);
if (progress.Done)
{
break;
}
Thread.Sleep(delay);
}
Console.WriteLine();
}, TaskCreationOptions.LongRunning);
projects.ForEach(p => produceCSFiles.Post(p));
produceCSFiles.Complete();
getMatchingLines.Completion.GetAwaiter().GetResult();
progress.Done = true;
progressTask.GetAwaiter().GetResult();
res.SaveAsJson(outFilePath);
}
默认参数为(https://github.com/MarkKharitonov/LearningTPLDataFlow/blob/master/FindStringCmd.cs#L24-L28):
private int m_maxDOP1 = 3;
private int m_maxDOP2 = 20;
private int m_maxDOP3 = Environment.ProcessorCount;
private int m_maxDOP4 = Environment.ProcessorCount;
private int m_workSize = 1_000_000;
我的想法是将工作划分为工作项,其中工作项的大小是通过将各自文件中的行数乘以字符串字面量的计数来计算的。因此,如果c#文件包含500行,那么在其中搜索所有3401字量将得到大小为3401 * 500 = 1700500
的作品。工作单元默认为1000000行,因此在前面的示例中,文件将产生2个工作项:
- 文字1999 0 . .
- 文字2000 . . 3400
,它负责produceWorkItems
阻止生成这些工作项文件。
示例运行:
C:workTPLDataFlow [master ≡]> .binDebugnet5.0TPLDataFlow.exe find-string -d C:xyztip -o c:temp -l C:temp2.txt
Locating all the instances of the 4 literals found in the file C:temp2.txt in the C# code ...
Total = 49,844,516, Current = 49,702,532, Percents = 99.72%
Elapsed: 00:00:18.4320676
C:workTPLDataFlow [master ≡]> .binDebugnet5.0TPLDataFlow.exe find-string -d C:xyztip -o c:temp -l C:temp1.txt
Locating all the instances of the 3401 literals found in the file c:temp1.txt in the C# code ...
Total = 42,379,095,775, Current = 42,164,259,870, Percents = 99.49%
Elapsed: 01:44:13.4289270
许多工作项尺寸不足。如果我有3个c#文件,每个文件20行,那么我当前的代码将产生3个工作项,因为在我当前的实现中,工作项从未跨越文件边界。这是低效的。理想情况下,它们将被批处理到单个工作项中,因为60 * 3401 = 204060 <1000000. 但是BatchBlock
不能在这里使用,因为它希望我提供批大小,我不知道-它取决于管道中的工作项。
如何实现这样的批处理?
我已经意识到一些东西。也许这是显而易见的,但我刚刚弄明白了。如果可以首先缓冲所有项,那么TPL DataFlow库就没有用处。在我的例子中,我可以这样做。这样,我就可以从大到小进行缓冲和排序。这样,一个简单的Parallel.ForEach
将做的工作很好。意识到,我改变了我的实现使用这样的反应:
第1阶段-获取所有项,这是所有IO所在的位置
var input = (workspaceRoot + "build\projects.yml")
.YieldAllProjects()
.ToObservable()
.Select(project => Observable.FromAsync(() => Task.Run(() => YieldFiles(project, searchAllFiles))))
.Merge(2)
.SelectMany(files => files)
.Select(file => Observable.FromAsync(file.PopulateContentAsync))
.Merge(10)
.ToList()
.GetAwaiter().GetResult()
.AsList();
input.Sort((x, y) => y.EstimatedLineCount - x.EstimatedLineCount);
阶段2 -找到所有匹配的行(仅限CPU)
var res = new SortedDictionary<string, List<MatchingLine>>();
input
.ToObservable()
.Select(file => Observable.FromAsync(() => Task.Run(() => file.YieldMatchingLines(literals, 0, literals.Count, progress).ToList())))
.Merge(maxDOP.Value)
.ToList()
.GetAwaiter().GetResult()
.SelectMany(m => m)
.ForEach(m => AddResult(res, m));
所以,即使我有数百个项目,数千个文件和数百万行代码-这不是TPL DataFlow的规模,因为我的工具可以读取所有文件到内存中,以有利的顺序重新排列,然后处理。
关于第一个问题(配置管道),我真的无法提供任何指导。对我来说,优化数据流管道的参数似乎是一种黑色艺术!
关于第二个问题(如何批处理由编译时大小未知的工作项组成的工作负载),您可以使用下面的自定义BatchBlock<T>
。它使用DataflowBlock.Encapsulate
方法将两个数据流块合并为一个。ActionBlock<T>
中的第一个块接收输入并将其放入缓冲区,第二个块是BufferBlock<T[]>
,它保存批处理项并将它们向下传播。weightSelector
是一个lambda,它返回每个接收到的物品的权重。当累积的权重超过batchWeight
阈值时,发出一个批处理。
public static IPropagatorBlock<T, T[]> CreateDynamicBatchBlock<T>(
int batchWeight, Func<T, int> weightSelector,
DataflowBlockOptions options = null)
{
// Arguments validation omitted
options ??= new DataflowBlockOptions();
var outputBlock = new BufferBlock<T[]>(options);
List<T> buffer = new List<T>();
int sumWeight = 0;
var inputBlock = new ActionBlock<T>(async item =>
{
checked
{
int weight = weightSelector(item);
if (weight + sumWeight > batchWeight && buffer.Count > 0)
await SendBatchAsync();
buffer.Add(item);
sumWeight += weight;
if (sumWeight >= batchWeight) await SendBatchAsync();
}
}, new()
{
BoundedCapacity = options.BoundedCapacity,
CancellationToken = options.CancellationToken,
TaskScheduler = options.TaskScheduler,
MaxMessagesPerTask = options.MaxMessagesPerTask,
NameFormat = options.NameFormat
});
PropagateCompletion(inputBlock, outputBlock, async () =>
{
if (buffer.Count > 0) await SendBatchAsync();
});
Task SendBatchAsync()
{
var batch = buffer.ToArray();
buffer.Clear();
sumWeight = 0;
return outputBlock.SendAsync(batch);
}
static async void PropagateCompletion(IDataflowBlock source,
IDataflowBlock target, Func<Task> postCompletionAction)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
Exception ex =
source.Completion.IsFaulted ? source.Completion.Exception : null;
try { await postCompletionAction(); }
catch (Exception actionError) { ex = actionError; }
if (ex != null) target.Fault(ex); else target.Complete();
}
return DataflowBlock.Encapsulate(inputBlock, outputBlock);
}
使用例子:
var batchBlock = CreateDynamicBatchBlock<WorkItem>(1_000_000, wi => wi.Size);
如果权重int
类型没有足够的范围和溢出,您可以切换到long
或double
。