简介
我正在解决以下问题:
- 给定具有一个或多个公开可见常量的 C# 类型 X,解决方案中依赖于类型 X 中的常量的所有 C# 类型是什么?
由于常量在编译时内联,因此检查二进制文件没有意义。我们需要使用 Roslyn API 检查源代码。
我不打算使用语义分析,因为它会非常昂贵。相反,我将使用正则表达式来检查给定文件是否似乎使用了常量,然后使用语法树进行验证。不是防弹的,但足够好,而且相对较快。
现在的统计数据是:
- 项目计数: 333
- 文件计数: 45280
- 固态硬盘
我的实现
总体方案是:
- 生成相关
Microsoft.Build.Evaluation.Project
对象的流,这将为我们提供 C# 文件的列表 - 从 C# 文件生成 C# 文件内容流
- 对于每个 C# 文件内容与给定的正则表达式匹配,对于每个匹配项,是否使用该 C# 文件内容的语法树确定是否使用了相关常量。如果是肯定的 - 报告相应的类型。
我不想过多的细节,另一方面,我想提供足够的细节来解释我的困难。不多,请耐心等待。
我使用几个小的辅助类型:
项目项
private class ProjectItem
{
public readonly string AssemblyName;
public readonly CSharpParseOptions ParseOptions;
public readonly IEnumerable<string> CSFilePaths;
public ProjectItem(TypeMap typeMap, string asmName)
{
AssemblyName = asmName;
var asmProps = typeMap.Assemblies[asmName];
ParseOptions = asmProps.GetParseOptions();
CSFilePaths = new Project(asmProps.ProjectPath).GetItems("Compile").Select(item => item.GetMetadataValue("FullPath"));
}
public IEnumerable<TextItem> YieldTextItems() => CSFilePaths.Select(csFilePath => new TextItem(this, csFilePath, File.ReadAllText(csFilePath)));
}
其中TypeMap
是我们解决方案中使用的所有类型和程序集的注册表。其他一些代码以前已经构建了它。将其视为可以回答一些问题的预言机,例如"给我给定程序集的解析选项(或项目路径)"。但它不指定项目使用的 C# 文件的列表。为此,我们需要实例化相应的Microsoft.Build.Evaluation.Project
实例。这很贵。
文本项
private class TextItem
{
public readonly string AssemblyName;
public readonly CSharpParseOptions ParseOptions;
public readonly string CSFilePath;
public readonly string Text;
public TextItem(ProjectItem item, string csFilePath, string text)
{
AssemblyName = item.AssemblyName;
ParseOptions = item.ParseOptions;
CSFilePath = csFilePath;
Text = text;
}
public IEnumerable<TypeDefKey> YieldDependentTypes(TypeMap typeMap, TypeDefKey constTypeDefKey, Regex regex)
{
...
SyntaxTree syntaxTree = null;
foreach (Match m in regex.Matches(Text))
{
if (syntaxTree == null)
{
syntaxTree = CSharpSyntaxTree.ParseText(Text, ParseOptions, CSFilePath);
...
}
...
if (IsTheRegexMatchIndeedCorrespondsToTheGivenConstantType(syntaxTree, ...))
{
var typeDefKey = GetTheType(syntaxTree, ...);
yield return typeDefKey;
}
}
}
}
鉴于上述类型,我想出了这个简单的TPL数据流管道:
var regex = GetRegex(...);
var dependentAssemblies = GetDependentAssemblies(...);
var dependentTypes = new ConcurrentDictionary<TypeDefKey, object>();
var produceCSFilePaths = new TransformManyBlock<ICollection<string>, ProjectItem>(asmNames => asmNames.Select(asmName => new ProjectItem(typeMap, asmName)));
var produceCSFileText = new TransformManyBlock<ProjectItem, TextItem>(p => p.YieldTextItems());
var produceDependentTypes = new TransformManyBlock<TextItem, TypeDefKey>(t => t.YieldDependentTypes(typeMap, constTypeDefKey, regex));
var getDependentTypes = new ActionBlock<TypeDefKey>(typeDefKey => dependentTypes.TryAdd(typeDefKey, null));
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
produceCSFilePaths.LinkTo(produceCSFileText, linkOptions);
produceCSFileText.LinkTo(produceDependentTypes, linkOptions);
produceDependentTypes.LinkTo(getDependentTypes, linkOptions);
produceCSFilePaths.Post(dependentAssemblies);
produceCSFilePaths.Complete();
getDependentTypes.Completion.Wait();
问题和疑问
- 它很慢 - 大约需要 50 秒,CPU 利用率很低。我意识到这里有很多 IO,但仍然涉及 CPU 来应用正则表达式并将内容解析到相应的语法树中。
- 我不明白如何将
TransformManyBlock
与异步 IO 一起使用。ProjectItem.YieldTextItems
函数可以返回IObservable<TextItem>
或IAsyncEnumerable<TextItem>
,但TransformManyBlock
无法识别它。我是TPL数据流的新手,所以我不清楚如何解决这个问题。这就是为什么我使用阻塞File.ReadAllText
而不是File.ReadAllTextAsync
。 - 我认为我的管道使用 ThreadPool 线程(通过默认的任务计划程序),但它不应该使用真正的线程吗?就像用
Task.Factory.StartNew(..., TaskCreationOptions.LongRunning);
创建的那些一样?那么,它是否使用"正确"的线程?如果没有 - 如何解决它?我已经看到了实现自定义TaskScheduler
的建议,但我找不到示例。现有的似乎依赖于内部实现,因此不清楚如何进行。 - 我试图为
ProjectItem
和TextItem
生产增加MaxDegreeOfParallelism,因为这两个主要是IO的,因此比最后一部分慢得多 - 检查C#文件内容。但它并没有在性能上产生太大的改进。我的理解是,管道越慢,并行性应该越多。另一方面,我不知道从SSD读取时可以有多少并行性。目前还不清楚如何剖析它。
下面是TransformManyBlock<TInput,TOutput>
数据流组件的直接替换,其中包含接受Func<TInput, IAsyncEnumerable<TOutput>>
委托的构造函数。它具有所有 API 并支持所有选项(但请参阅最后有关BoundedCapacity
的警告)。它在内部由三个链接的组件组成,一个TransformBlock
,一个ActionBlock
和一个BufferBlock
:
public class TransformManyBlockEx<TInput, TOutput>
: IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>
{
private readonly TransformBlock<TInput, (long, TInput)> _input;
private readonly ActionBlock<(long, TInput)> _transformer;
private readonly BufferBlock<TOutput> _output;
private readonly Dictionary<long, (Queue<TOutput> Queue, bool Completed)> _byIndex;
private readonly CancellationToken _cancellationToken;
private long currentIndex = 0L;
private long minIndex = 0L;
public TransformManyBlockEx(Func<TInput, IAsyncEnumerable<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
// Arguments validation omitted
dataflowBlockOptions ??= new ExecutionDataflowBlockOptions();
_cancellationToken = dataflowBlockOptions.CancellationToken;
if (dataflowBlockOptions.EnsureOrdered)
_byIndex = new Dictionary<long, (Queue<TOutput>, bool)>();
_input = new TransformBlock<TInput, (long, TInput)>(item =>
{
return (currentIndex++, item); // No parallelism here
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = _cancellationToken
});
_transformer = new ActionBlock<(long, TInput)>(async entry =>
{
var (index, item) = entry;
Queue<TOutput> queue = null;
if (_byIndex != null)
{
// EnsureOrdered is enabled
queue = new Queue<TOutput>();
lock (_byIndex) _byIndex.Add(index, (queue, false));
}
var resultSequence = transform(item);
await foreach (var result in resultSequence
.WithCancellation(_cancellationToken))
{
if (_byIndex != null)
{
lock (queue) queue.Enqueue(result);
if (!await SendPendingResultsAsync()) return;
}
else
{
if (!await _output.SendAsync(result, _cancellationToken)) return;
}
}
if (_byIndex != null)
{
lock (_byIndex) _byIndex[index] = (queue, true); // Mark as completed
await SendPendingResultsAsync();
}
}, dataflowBlockOptions);
_input.LinkTo(_transformer, new() { PropagateCompletion = true });
_output = new BufferBlock<TOutput>(dataflowBlockOptions);
Task transformerPostCompletion = _transformer.Completion.ContinueWith(t =>
{
if (_byIndex != null)
{
int pendingCount;
lock (_byIndex)
{
pendingCount = _byIndex.Count;
_byIndex.Clear(); // Cleanup
}
if (t.IsCompletedSuccessfully && pendingCount > 0)
throw new InvalidOperationException(
"The transformer completed before emitting all queued results.");
}
}, TaskScheduler.Default);
// The Task.WhenAll aggregates nicely the exceptions of the two tasks
PropagateCompletion(
Task.WhenAll(_transformer.Completion, transformerPostCompletion),
_output);
}
private static async void PropagateCompletion(Task sourceCompletion,
IDataflowBlock target)
{
try { await sourceCompletion.ConfigureAwait(false); } catch { }
var ex = sourceCompletion.IsFaulted ? sourceCompletion.Exception : null;
if (ex != null) target.Fault(ex); else target.Complete();
}
private async Task<bool> SendPendingResultsAsync()
{
// Returns false in case the BufferBlock rejected a result
// This may happen in case of cancellation
while (TrySendNextPendingResult(out var sendTask))
{
if (!await sendTask) return false;
}
return true;
}
private bool TrySendNextPendingResult(out Task<bool> sendTask)
{
// Returns false in case currently there is no pending result
sendTask = null;
lock (_byIndex)
{
while (true)
{
if (!_byIndex.TryGetValue(minIndex, out var entry))
return false; // The next queue in not in the dictionary yet
var (queue, completed) = entry; // We found the next queue
lock (queue)
{
if (queue.TryDequeue(out var result))
{
// We found the next result
// Send the result while holding the lock on _byIndex
// The BufferBlock respects the order of submited items
sendTask = _output.SendAsync(result, _cancellationToken);
return true;
}
}
// Currently the queue is empty
// If it's not completed yet, return. It may have more items later.
if (!completed) return false;
// OK, the queue is now both empty and completed
_byIndex.Remove(minIndex); // Remove it
minIndex++; // Continue with the next queue in order
}
}
}
public TransformManyBlockEx(Func<TInput, Task<IEnumerable<TOutput>>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
: this(ToAsyncEnumerable(transform), dataflowBlockOptions) { }
public TransformManyBlockEx(Func<TInput, IEnumerable<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
: this(ToAsyncEnumerable(transform), dataflowBlockOptions) { }
public Task Completion => _output.Completion;
public void Complete() => _input.Complete();
void IDataflowBlock.Fault(Exception exception)
=> ((IDataflowBlock)_input).Fault(exception);
public int InputCount
=> _input.InputCount + _input.OutputCount + _transformer.InputCount;
public int OutputCount
{
get
{
int count = _output.Count;
if (_byIndex == null) return count;
lock (_byIndex) return count + _byIndex.Values
.Select(e => { lock (e.Queue) return e.Queue.Count; }).Sum();
}
}
public IDisposable LinkTo(ITargetBlock<TOutput> target,
DataflowLinkOptions linkOptions)
=> _output.LinkTo(target, linkOptions);
public bool TryReceive(Predicate<TOutput> filter, out TOutput item)
=> ((IReceivableSourceBlock<TOutput>)_output).TryReceive(filter, out item);
public bool TryReceiveAll(out IList<TOutput> items)
=> ((IReceivableSourceBlock<TOutput>)_output).TryReceiveAll(out items);
DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
DataflowMessageHeader messageHeader, TInput messageValue,
ISourceBlock<TInput> source, bool consumeToAccept)
=> ((ITargetBlock<TInput>)_input).OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<TOutput> target, out bool messageConsumed)
=> ((ISourceBlock<TOutput>)_output).ConsumeMessage(messageHeader, target,
out messageConsumed);
bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<TOutput> target)
=> ((ISourceBlock<TOutput>)_output).ReserveMessage(messageHeader, target);
void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<TOutput> target)
=> ((ISourceBlock<TOutput>)_output).ReleaseReservation(messageHeader,
target);
private static Func<TInput, IAsyncEnumerable<TOutput>> ToAsyncEnumerable(
Func<TInput, Task<IEnumerable<TOutput>>> transform)
{
return (item) => Iterator(item);
async IAsyncEnumerable<TOutput> Iterator(TInput item)
{
foreach (var result in await transform(item)) yield return result;
}
}
private static Func<TInput, IAsyncEnumerable<TOutput>> ToAsyncEnumerable(
Func<TInput, IEnumerable<TOutput>> transform)
{
return (item) => Iterator(item);
async IAsyncEnumerable<TOutput> Iterator(TInput item)
{
foreach (var result in transform(item)) yield return result;
await Task.CompletedTask; // Suppress CS1998
}
}
}
主要困难在于EnsureOrdered
选项,因为我们不能依赖任何组合块的内置功能。所选择的解决方案基于这个问题的想法:一个带有long
键的Dictionary
,其中结果按其原始顺序存储,以及一个保存尚未发出的最小索引的long
计数器。
此实现具有按组合生成的所有自定义数据流组件的常见缺点:
- 异常包含在额外的
AggregateException
中。这通常需要Flatten
管道中最后一个块传播的异常。 - 链接的内部组件的数量。
TransformManyBlockEx
的有效BoundedCapacity
是选项中传递的值的三倍,加上存储在内部重新排序缓冲区中的结果(启用EnsureOrdered
选项时)。此缓冲区实际上不受限制。不幸的是,修复此缺陷并非易事。
BoundedCapacity
乘以