删除数百万个压缩 CSV 文件中的重复行,同时保留重复行中的一条信息



有一个~1000万个GZipped CSV文件的集合,每个文件有100-1000行和>2000列。 每个文件还包含一个标头。

在每个CSV文件中,有两个重要的列,"ID"和"目标"。

我正在尝试删除具有重复"目标"的行,但保留要删除的行中的 ID,以及不会删除的行。

例如

输入:

CSV1
|   ID  |  Target                      |
|-------|------------------------------|
| IX213 | C1=CC(=CC=C1CC(=O)C(=O)O)O   |
| IX412 | CN1C=NC2=C1C(=O)N(C(=O)N2C)C |
CSV2
|   ID  |  Target                      |
|-------|------------------------------|
| BC144 | CN1C=NC2=C1C(=O)N(C(=O)N2C)C |
| BC155 | C(CC(=O)O)C(C(=O)O)N         |

输出:

CSV1*
|   ID         |  Target                      |
|--------------|------------------------------|
| IX213        | C1=CC(=CC=C1CC(=O)C(=O)O)O   |
| IX412; BC144 | CN1C=NC2=C1C(=O)N(C(=O)N2C)C |
CSV2*
|   ID  |  Target                      |
|-------|------------------------------|
| BC155 | C(CC(=O)O)C(C(=O)O)N         |

对于带有Pandas(Python)或类似产品的少量文件来说,这将是直截了当的,但希望有人可以有更好的方法来处理数百万个文件和数十亿个条目。

我会编写一个程序,通过一个压缩的csv文件并写入以下4列:target id file row

在每个文件上运行它,您将获得~1000万个小文件。

假设您不是以分布式方式执行此操作,接下来我将文件合并为一个大文件,并使用 unix 排序实用程序对其进行排序。 (警告您要执行LC_ALL=C sort foo.txt,因为 C 语言环境更快,并且产生更明智的结果。 有关详细信息,请参阅排序未按预期排序(空格和区域设置)。

现在可以轻松处理该文件,并决定保留哪个文件。 您可以使用列file row target id is_keep removed_ids写出文件。 请务必用前导零写出行,因此您可以写000042而不是42removed_ids是如果您保留此文件,则从其他文件中删除的文件。 (前导零的数量应该足以容纳最大的文件。 这就是使腹水顺序与数字顺序匹配。

再次对此文件进行排序,然后将其分解为每个文件

的文件。给定原始 gzip 文件和要保留哪些行的文件,以及要存储哪些 id(如果您保留它),很容易处理原始文件以删除/保留行并记录您删除的内容。 我强烈建议进行健全性检查以验证目标/id/行是否都匹配。 并且不要删除原件,除非通过健全性检查。


如果您是分布式处理的忠实粉丝,那么从排序到map-reduce的转换非常简单。 如果您有该设置的基础结构,则不妨使用它。 如果没有,我会建议使用这种排序文件方法,仅使用并行化来处理第一个/最后一个的所有单个文件。

尽管数据量可能看起来势不可挡,但我认为如果您保留所需的适量数据,您可以按顺序迭代所有文件。例如,您可以跟踪与唯一目标的关系,第一个ID具有该目标和ID别名的关系(例如,IDIX412对应于BC144)。这样,您的解决方案可能如下所示:

import csv
filenames = [...]
target_ids = {}
aliases = {}
for filename in filenames:
with open(filename, 'r') as file_in:
reader = csv.DictReader(file_in)
for row in reader:
if row['Target'] in target_ids:
aliases[row['ID']] = target_ids[row['Target']]
remove_row(row)  # Do whatever you may require
else:
target_ids[row['Target']] = row['ID']

请注意,拥有 10M 键值对的dict是完全可以处理的。

如果这仍然不适合内存,您可以使用shelve而不是字典,以便将相应的数据存储在HDD中。您可以执行以下操作:

import csv
import shelve
filenames = [...]
with shelve.open('target_ids') as target_ids, shelve.open('aliases') as aliases:
for filename in filenames:
with open(filename, 'r') as file_in:
reader = csv.DictReader(file_in)
for row in reader:
if row['Target'] in target_ids:
aliases[row['ID']] = target_ids[row['Target']]
remove_row(row)  # Do whatever you may require
else:
target_ids[row['Target']] = row['ID']

shelve关于常规字典的缺点是速度。

相关内容

最新更新