我正在尝试通过将处理后的数据添加到BlockingCollection
Parallel.ForEach
来处理大量文本文件。
问题是我希望 taskWriteMergedFile Task
使用集合并至少每 800000 行将它们写入结果文件。
我想我无法在迭代中测试集合大小,因为它是并行的,所以我创建了Task
.
在这种情况下,我可以将任务中的 while(true) 循环转换为 EventWaitHandle
吗?
const int MAX_SIZE = 1000000;
static BlockingCollection<string> mergeData;
mergeData = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_SIZE);
string[] FilePaths = Directory.GetFiles("somepath");
var taskWriteMergedFile = new Task(() =>
{
while ( true )
{
if ( mergeData.Count > 800000)
{
String.Join(System.Environment.NewLine, mergeData.GetConsumingEnumerable());
//Write to file
}
Thread.Sleep(10000);
}
}, TaskCreationOptions.LongRunning);
taskWriteMergedFile.Start();
Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath));
mergeData.CompleteAdding();
你可能不想那样做。相反,让您的任务在收到文件时将每一行写入文件。如果要将文件大小限制为 80,000 行,则在写入第 80,000 行后,关闭当前文件并打开一个新文件。
想想看,您拥有的东西不起作用,因为GetConsumingEnumerable()
在集合标记为完成以进行添加之前不会停止。将发生的情况是,该事物将通过睡眠循环,直到队列中有 80,000 个项目,然后它会阻塞String.Join
,直到主线程调用CompleteAdding
。如果有足够的数据,您的内存就会不足。
此外,除非您有很好的理由,否则您不应该在此处使用ConcurrentBag
。只需使用 BlockingCollection
的默认值,即 ConcurrentQueue
。 ConcurrentBag
是一种相当特殊的数据结构,其性能不如ConcurrentQueue
。
因此,您的任务变为:
var taskWriteMergedFile = new Task(() =>
{
int recordCount = 0;
foreach (var line in mergeData.GetConsumingEnumerable())
{
outputFile.WriteLine(line);
++recordCount;
if (recordCount == 80,000)
{
// If you want to do something after 80,000 lines, do it here
// and then reset the record count
recordCount = 0;
}
}
}, TaskCreationOptions.LongRunning);
当然,前提是您已在其他地方打开了输出文件。最好在任务开始时打开输出,并在foreach
退出后将其关闭。
另一方面,您可能不希望生产者循环是并行的。你有:
Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath));
我不确定AddToDataPool
在做什么,但如果它是读取文件并将数据写入集合,那么您会遇到一些问题。首先,磁盘驱动器一次只能做一件事,因此它最终会读取一个文件的一部分,然后是另一个文件的一部分,然后是另一个文件的一部分,依此类推。为了读取下一个文件的每个块,它必须将磁头寻找到正确的位置。磁盘磁头寻道非常昂贵 - 5毫秒或更长时间。CPU 时间的永恒。除非您要执行比读取文件花费更长的繁重处理时间,否则您几乎总是最好一次处理一个文件。除非您可以保证输入文件位于单独的物理磁盘上。
第二个潜在问题是,在运行多个线程的情况下,无法保证将内容写入集合的顺序。当然,这可能不是问题,但是如果您希望单个文件中的所有数据在输出中组合在一起,则不会发生多个线程,每个线程将多行写入集合。
只是要记住的事情。