TPL数据流:ActionBlock,避免在每次调用其委托时重复运行using块(例如写入StreamWriter)



我需要从IDataReader读取1M行,同时写入n文本文件。这些文件中的每一个都将是可用列的不同子集;所有n文本文件在完成时将为1M行长。

当前计划是一个TransformManyBlock来迭代IDataReader,链接到BroadcastBlock,链接到nBufferBlock/ActionBlock对。

我试图避免的是让我的ActionBlock委托执行using (StreamWriter x...) { x.WriteLine(); },它会打开和关闭每个输出文件一百万次。

我目前的想法是代替ActionBlock,编写一个实现ITargetBlock<gt;。有没有更简单的方法?

第1版:这个讨论对我目前的问题很有价值,但到目前为止,答案都集中在文件系统行为上。为了将来的搜索者的利益,问题的重点是如何在ActionBlock委托之外构建某种设置/拆卸。这将适用于任何类型的一次性用品,你通常会包装在使用块中。

第2版:根据@Panagiotis Kanavos,解决方案的执行摘要是在定义块之前设置对象,然后在块的完成中拆除对象。继续

即使不必每次都打开流,一次写入一行文件本身也很昂贵。保持文件流打开还有其他问题,因为出于性能原因,文件流始终是缓冲的,从FileStream级别一直到文件系统驱动程序。您必须定期刷新流,以确保数据已写入磁盘。

为了真正提高性能,您必须对记录进行批处理,例如使用BatchBlock。一旦这样做,打开流的成本就可以忽略不计了。

行也应该在最后一刻生成,以避免生成需要垃圾收集的临时字符串。在n*1M记录的情况下,这些分配和垃圾回收的内存和CPU开销将非常严重。

日志库在写入之前对日志条目进行批处理,以避免这种性能影响。

你可以试试这样的东西:

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>( records => {

//Create or open a file for appending
using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
writer.WriteLine("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
}
});
batchBlock.LinkTo(writerBlock,options);

或者,使用异步方法

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>(async records => {

//Create or open a file for appending
await using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
}
});
batchBlock.LinkTo(writerBlock,options);

您可以调整批处理大小和StreamWriter的缓冲区大小以获得最佳性能。

创建一个实际的";块";写入流

可以使用自定义数据流块演练中显示的技术创建自定义块,而不是创建实际的自定义块,而是创建一些返回LinkTo工作所需的东西,在这种情况下是ITargetBlock< T>:

ITargetBlock<Record> FileExporter(string path)
{
var writer=new StreamWriter(path,true);
var block=new ActionBlock<Record>(async msg=>{
await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
});
//Close the stream when the block completes
block.Completion.ContinueWith(_=>write.Close());
return (ITargetBlock<Record>)target;
}
...

var exporter1=CreateFileExporter(path1);
previous.LinkTo(exporter,options);

";技巧";这里的流是在块之外创建的,并且在块完成之前一直保持活动状态。它不是垃圾收集的,因为它被其他代码使用。当块完成时,无论发生什么,我们都需要显式地关闭它。无论块是否正常完成,block.Completion.ContinueWith(_=>write.Close());都将关闭流。

这与演练中使用的关闭输出BufferBlock:的代码相同

target.Completion.ContinueWith(delegate
{
if (queue.Count > 0 && queue.Count < windowSize)
source.Post(queue.ToArray());
source.Complete();
});

流在默认情况下是缓冲的,所以调用WriteLine并不意味着数据将实际写入磁盘。这意味着我们不知道数据何时会真正写入文件。如果应用程序崩溃,一些数据可能会丢失。

内存、IO和开销

当在相当长的一段时间内处理1M行时,事情就会累积起来。可以使用例如File.AppendAllLinesAsync一次写入一批行,但这将导致分配1M个临时字符串。在每次迭代中,运行时必须使用至少作为批处理的临时字符串的RAM。在GC启动冻结线程之前,RAM的使用量将开始膨胀到数百MB,然后是GB。

对于1M行和大量数据,很难调试和跟踪管道中的数据。如果出现问题,事情可能会很快崩溃。例如,想象一下,由于一条消息被阻止,1M条消息被困在一个块中。

重要的是(出于健全性和性能的原因)保持管道中的各个组件尽可能简单。

通常在使用TPL时,我会创建自定义类,这样我就可以创建专用成员变量和专用方法,用于管道中的块,但我不会实现ITargetBlockISourceBlock,而是在自定义类中拥有我需要的任何块,然后我将CCD_ 10和/或CCD_。

相关内容

最新更新