有一个~1000万个GZipped CSV文件的集合,每个文件有100-1000行和>2000列。 每个文件还包含一个标头。
在每个CSV文件中,有两个重要的列,"ID"和"目标"。
我正在尝试删除具有重复"目标"的行,但保留要删除的行中的 ID,以及不会删除的行。
例如
输入:
CSV1
| ID | Target |
|-------|------------------------------|
| IX213 | C1=CC(=CC=C1CC(=O)C(=O)O)O |
| IX412 | CN1C=NC2=C1C(=O)N(C(=O)N2C)C |
CSV2
| ID | Target |
|-------|------------------------------|
| BC144 | CN1C=NC2=C1C(=O)N(C(=O)N2C)C |
| BC155 | C(CC(=O)O)C(C(=O)O)N |
输出:
CSV1*
| ID | Target |
|--------------|------------------------------|
| IX213 | C1=CC(=CC=C1CC(=O)C(=O)O)O |
| IX412; BC144 | CN1C=NC2=C1C(=O)N(C(=O)N2C)C |
CSV2*
| ID | Target |
|-------|------------------------------|
| BC155 | C(CC(=O)O)C(C(=O)O)N |
对于带有Pandas(Python)或类似产品的少量文件来说,这将是直截了当的,但希望有人可以有更好的方法来处理数百万个文件和数十亿个条目。
我会编写一个程序,通过一个压缩的csv文件并写入以下4列:target id file row
。
在每个文件上运行它,您将获得~1000万个小文件。
假设您不是以分布式方式执行此操作,接下来我将文件合并为一个大文件,并使用 unix 排序实用程序对其进行排序。 (警告您要执行LC_ALL=C sort foo.txt
,因为 C 语言环境更快,并且产生更明智的结果。 有关详细信息,请参阅排序未按预期排序(空格和区域设置)。
现在可以轻松处理该文件,并决定保留哪个文件。 您可以使用列file row target id is_keep removed_ids
写出文件。 请务必用前导零写出行,因此您可以写000042
而不是42
。removed_ids
是如果您保留此文件,则从其他文件中删除的文件。 (前导零的数量应该足以容纳最大的文件。 这就是使腹水顺序与数字顺序匹配。
再次对此文件进行排序,然后将其分解为每个文件
的文件。给定原始 gzip 文件和要保留哪些行的文件,以及要存储哪些 id(如果您保留它),很容易处理原始文件以删除/保留行并记录您删除的内容。 我强烈建议进行健全性检查以验证目标/id/行是否都匹配。 并且不要删除原件,除非通过健全性检查。
如果您是分布式处理的忠实粉丝,那么从排序到map-reduce的转换非常简单。 如果您有该设置的基础结构,则不妨使用它。 如果没有,我会建议使用这种排序文件方法,仅使用并行化来处理第一个/最后一个的所有单个文件。
尽管数据量可能看起来势不可挡,但我认为如果您保留所需的适量数据,您可以按顺序迭代所有文件。例如,您可以跟踪与唯一目标的关系,第一个ID具有该目标和ID别名的关系(例如,IDIX412
对应于BC144
)。这样,您的解决方案可能如下所示:
import csv
filenames = [...]
target_ids = {}
aliases = {}
for filename in filenames:
with open(filename, 'r') as file_in:
reader = csv.DictReader(file_in)
for row in reader:
if row['Target'] in target_ids:
aliases[row['ID']] = target_ids[row['Target']]
remove_row(row) # Do whatever you may require
else:
target_ids[row['Target']] = row['ID']
请注意,拥有 10M 键值对的dict
是完全可以处理的。
如果这仍然不适合内存,您可以使用shelve
而不是字典,以便将相应的数据存储在HDD中。您可以执行以下操作:
import csv
import shelve
filenames = [...]
with shelve.open('target_ids') as target_ids, shelve.open('aliases') as aliases:
for filename in filenames:
with open(filename, 'r') as file_in:
reader = csv.DictReader(file_in)
for row in reader:
if row['Target'] in target_ids:
aliases[row['ID']] = target_ids[row['Target']]
remove_row(row) # Do whatever you may require
else:
target_ids[row['Target']] = row['ID']
shelve
关于常规字典的缺点是速度。