TPL数据流同步处理每个文件,但异步处理文件中的每一行



因此,我的用例要求我处理一个文件列表,其中对于列表中的每个文件,我遍历每一行,并对这些行进行一些计算。现在我的问题是,我的缓冲区块中不能有多个文件的行,所以我基本上需要确保一个文件被完全处理(通过一系列数据流块(,然后才能进入第二个文件。

现在我看了一下TPL DataFlow一个接一个的处理,其中的答案是要么完全停止使用TPL数据流,要么将多个处理块封装到一个处理块中,这样我就可以控制它;可堆肥性";正如tpl所提供的那样,将独立的区块集中在一起似乎也有点浪费。有其他方法吗?

我想在叶节点上使用OutputAvailableAsync,在我发布到另一个文件之前,当所有内容都被清除时通知我。但我根本无法让OutputAvailableAsync工作。它只是永远等待。

编辑

接下来,我将有一个带有state的操作块,我计划使用ConcurrentDictionary(对于文件中的每一行,我都有多个需要注意的内容(。现在我不可能索引每一行,因为这意味着我必须保留N个正在一起处理的文件的状态。这里的N可能是要处理的文件数。

这就是我现在所拥有的,请记住,我刚刚编写了一个概念证明。

static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
{
var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
{
return File.ReadLines(filePath);
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
}

您可以利用TPL数据流的条件链接功能,以创建一个部分共享和部分专用的管道。所有文件将共享一个读取器块和一个解析器块,同时为每个文件创建一个专用处理器块。以下是概念的简单演示:

var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
return (line.Id, line.Line?.Split(","));
});
var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
var processor = CreateProcessor(file.Id);
// Create a conditional link from the parser block to the processor block
var link = parser.LinkTo(processor, entry => entry.Id == file.Id);
return File
.ReadLines(file.Path)
.Select(line => (file.Id, line))
.Append((file.Id, null)); // Completion signal
});
ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
var streamWriter = new StreamWriter($@"C:{fileId}.out");
return new ActionBlock<(int Id, string[] LineParts)>(line =>
{
if (line.LineParts == null)
{
streamWriter.Close(); // Completion signal received
return;
}
streamWriter.WriteLine(String.Join("|", line.LineParts));
});
}
reader.LinkTo(parser);

在本例中,每个文件都与一个int Id相关联。该Id与每一行一起传递,以便能够在下游重建文件。值元组用于将每条数据与其原始文件的Id进行组合。在共享parser块和每个专用processor块之间创建条件链路。null有效载荷被用作文件结束指示符。在接收到该信号时,处理器block应当理想地将其自身与parser解除链接,以便将条件链接机制的开销保持在最小。通过处理CCD_ 10方法返回的CCD_。为了简单起见,上面的例子省略了这一重要步骤。

我可能应该在这里重复一下我在前面一个相关问题的回答中已经写过的内容,即在块与块之间传递单个字符串将导致大量开销。为了确保管道尽可能顺利(无摩擦(运行,需要对工作量进行分块(分批(。

最新更新