嵌套 GNU 并行处理多个大文件,并将每个文件数据拆分为队列处理



我有一个目录,里面有近 100 个日志文件,每个日志文件重 10~15 GB。要求是逐行读取每个文件(顺序根本不重要(,清理 json 行并将其转储到后端 elasticsearch 存储进行索引。

这是我做这项工作的工人

# file = worker.php
echo " -- New PHP Worker Started -- "; // to get how many times gnu-parallel initiated the worker
$dataSet = [];
while (false !== ($line = fgets(STDIN))) {
// convert line text to json
$l = json_decode($line);
$dataSet[] = $l;
if(sizeof($dataSet) >= 1000) {
//index json to elasticsearch
$elasticsearch->bulkIndex($dataSet);
$dataSet = []; 
}
}

在这里和这里的答案的帮助下,我几乎就在那里,它正在工作(有点(,但只需要确保在引擎盖下它实际上正在做我假设它正在做的事情。

只需一个文件,我就可以按如下方式处理它

parallel --pipepart -a 10GB_input_file.txt  --round-robin php worker.php 

添加 --round-robin 可确保 PHP worker 进程只启动一次,然后它只是继续接收数据作为管道(穷人的队列(。

因此,对于 4CPU 机器,它会启动 4 个 php 工作线程并非常快速地处理所有数据。

要对所有文件执行相同的操作,这是我的看法

find /data/directory -maxdepth 1 -type f | parallel cat | parallel --pipe -N10000 --round-robin php worker.php 

这看起来有点有效,但我有一种直觉,这是为所有文件并行嵌套的错误方式。

其次,由于它不能使用--pipepart,我认为它更慢。

第三,一旦工作完成,我看到在 4cpu 机器上,只有 4 个工人启动并且工作完成了。这是正确的行为吗?它不应该为每个文件启动 4 个工作线程吗?只是想确保我没有错过任何数据。

知道如何以更好的方式做到这一点吗?

如果它们的大小大致相同,为什么不简单地给每个文件一个文件:

find /data/directory -maxdepth 1 -type f |
parallel php worker.php '<' {}

另一种方法是对它们中的每一个使用--pipepart

do_one() {
parallel --pipepart -a "$1" --block -1 php worker.php
}
export -f do_one
find /data/directory -maxdepth 1 -type f | parallel -j1 do_one

如果启动php worker.php不需要很长时间,那么最后一个可能更可取,因为如果文件的大小非常不同,它会更均匀地分布,因此如果最后一个文件很大,您最终不会等待单个进程完成处理。

最新更新