在16核机器上流式传输大型(5gb)CSV的FAST(并行?)



在一台有16个内核的服务器机器上,我有30个CSV文件(保存为.txt文件),每个文件的大小从2GB到11GB不等。

  • 每个CSV的每一行都包含一个日期、时间和ID

我需要构建一个大小为datetime x ID(大约35000 x 2000)的密集矩阵,其中每个单元格都是具有该日期时间和ID的行数(因此每个CSV行的日期时间和标识都用作更新该矩阵的矩阵索引)。每个文件都包含一个唯一的日期时间范围,因此该作业在文件之间的并行性令人尴尬。

问题:什么是更快/最快的方法来实现这一点&(可能)将其并行化 我偏爱Python,但如果有更好的解决方案,我可以在C++中工作。我应该用MapReduce还是MPI重写?看看达斯还是潘达斯?以某种方式编译我的python脚本?完全是别的吗?

我目前的方法(我很乐意为了更快的东西而放弃): 目前,我正在Python中串行地(一次一个CSV)执行此操作,并将输出矩阵保存为h5格式。我使用从命令行逐行流式传输CSV

cat one_csv.txt | my_script.py > outputfile.h5

我的python脚本的工作方式如下:

# initialize matrix
…
for line in sys.stdin:
    # Split the line into data columns
    split =  line.replace('n','').split(',')
    ...(extract & process datetime; extract ID)...
    # Update matrix
    matrix[datetime, ID] = matrix[datetime, ID] +1

EDIT以下是其中一个CSV的几行示例。唯一相关的列是"dateYMDD"(格式化为"80101"表示2008年1月1日)、"时间"one_answers"ID"。因此,例如,代码应该读取——使用下面CSV的第一行将1添加到对应于(Jan_1_2008_00_00,12)的矩阵单元。

此外:与唯一ID相比,有更多的唯一时间,CSV是按时间排序的。

Type|Number|dateYMDD|time|ID
2|519275|80101|0:00:00|12
5|525491|80101|0:05:00|25
2|624094|80101|0:12:00|75
5|623044|80102|0:01:00|75
6|658787|80102|0:03:00|4

首先,您可能应该评测您的脚本,以确保瓶颈确实在您认为的地方。

也就是说,Python的全局解释器锁将使并行化变得困难,除非你使用多处理,我希望简单地单独处理它们并合并结果会更快:为每个Python脚本提供一个CSV并输出到一个表,然后合并表。如果表比CSV小得多(正如人们所期望的,如果单元具有高值),那么这应该是相对有效的。

不过,我不认为这会让你们像你提到的那样全速前进。如果这不能满足你的期望,我会考虑用C++、Rust或Cython编写它。

最新更新