我有大约200万条记录,其中有大约4个字符串字段,每个字段需要检查重复。更具体地说,我有姓名,电话,地址和父亲的名字作为字段,我必须检查重复使用所有这些字段与其余的数据。生成的唯一记录需要记录到db.
我已经能够实现mapreduce,所有记录的迭代。任务速率设置为100/s, bucket-size设置为100。计费启用。
目前,一切都在工作,但性能非常非常慢。我已经能够在6小时内完成10,000条记录的测试数据集中仅1000条记录的重复数据删除处理。
当前在java中的设计是:
- 在每次映射迭代中,我将当前记录与
- 前一条记录是db中的一条记录,其作用类似于全局记录变量,我用每个映射中的另一个先前记录覆盖它迭代
- 比较使用一个算法完成,结果被写为新建实体到db
- 在一个Mapreduce作业结束时,我以编程方式创建另一个Mapreduce作业
- 上一个记录变量有助于作业与下一个进行比较候选记录与其余数据
为了在最短的时间内实现这一目标,我准备增加任何数量的GAE资源。
我的问题是:
- 重复数据删除(检查重复数据)的准确性是否会受到以下因素的影响并行的工作/任务吗?
- 该设计如何改进?
- 这将扩展到2000万条记录吗 读/写变量(不仅仅是计数器)最快的方式是什么?可以在一个mapreduce作业中使用。
最欢迎自由职业者来协助。
谢谢你的帮助。
您应该利用Reducer对每个字段执行相当于sort -u的操作。你需要在每个领域做一个M/R工作。你会让你要比较的字段成为映射器中的键,然后在reducer中你会得到所有相同名称的记录分组在一起,你可以标记它们。第二关是电话,等等。根据你的集群大小,每次传递应该非常快。
编辑:@Olaf指出OP可能想要完全独特的记录。使用多部分键,这可以是一行hadoop流命令来获得唯一集。我很快会加进去的。
Edit2:承诺的流式命令,将对整个文件执行排序-u。这假设您有一个文件,其中每个字段(姓名,父亲姓名,电话号码和地址)在dir hdfs://example/dedup/input/中的一个或多个文件中每行一个选项卡分隔。实际的hdfs路径可以是任何东西,也可以使用单个文件。输出将是hdfs://example/dedup/output/中的多个part-*文件。您可能还需要更改命令,因为hadoop-streaming.jar可能位于稍微不同的位置。如果您有超过4个字段,则更改stream.num.map.output.key.fields的值。
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar
-input hdfs://example/dedup/input/ -output hdfs://example/dedup/output/
-mapper org.apache.hadoop.mapred.lib.IdentityMapper
-reducer /usr/bin/uniq
-D stream.num.map.output.key.fields=4
要在本地文件系统文件中检索唯一的结果,运行如下命令:
$HADOOP_HOME/bin/hadoop fs -cat
'hdfs://example/dedup/output/part-*' > results.txt
需要注意的是,由于每一列都是一个键流,因此将添加一个空值,因此每行将在末尾有一个额外的选项卡。这很容易被剥掉。
如果您想做的不仅仅是获得uniq输出,您可以放入您自己的java类或命令行程序,而不是使用/usr/bin/uniq例如,该类可以通过在输入中添加第五列(即记录的DB ID)来更新发现重复的所有记录。默认情况下,Hadoop按整个键对结果进行分区,因此每组重复记录将在一个reducer中一起流式传输,并且所有这些都将并行进行。有关更多信息,请查看流媒体文档
我认为有两种方法可以解决这个问题:
-
(如果你只需要这样做一次)AppEngine为你的实体中的每个属性创建一个属性索引(除非你要求它不要这样做)。创建后端,使用游标批量运行查询"SELECT * FROM ORDER BY",确定重复属性并修复/删除。你可能能够并行化它,但它在分片边界上很棘手,你可能不得不自己编写所有的代码。
-
你可以使用mapper框架来做得慢一些,但是并行运行。这种方法还允许您在插入时有效地删除数据。引入一个新的实体来保存唯一的属性值。说"UniquePhoneNumber"。实体应该保存一个电话号码作为键和对该电话号码的实体的引用。现在运行一个map并查找UniquePhoneNumber。如果找到它并且它的引用有效,则删除副本。如果没有,创建一个具有正确引用的新文件。这样,如果需要的话,甚至可以将一个引用重新指向另一个引用。确保您读取了UniquePhoneNumber,并在单个事务中创建了一个新的/更新了一个新的。
为每条记录生成一个哈希码。循环遍历记录并根据哈希码将每个记录插入到Set
中。Set
现在是O(N)的重复数据删除列表。
您绝对不应该使用当前的方法—一次只有一个进程可以更新一个实体,因此您的整个mapreduce在该实体上存在瓶颈。此外,mapreduce目前不允许指定结果集的顺序,因此不能保证找到所有(甚至大多数)重复项。
目前,您最好的选择可能是构建自己的解决方案。使用游标,对按要重复数据删除的字段排序的类型执行查询,并对其进行扫描,检查重复项并在遇到重复项时删除它们(分批删除,以减少rpc)。当您需要链接另一个任务时(由于10分钟的任务限制),请使用光标确保新任务从您离开的地方开始。
如果你想并行化它,你可以这样做,让每个shard从跳过记录开始,直到它发现你正在重复数据删除的值发生了变化,然后从那里开始。在一个分片的末尾,等待到达一个组的末尾,然后停止。这样,您就可以确保不会错过删除位于分片边界边缘的重复项。
这是一个基于Map Reduce的散列自连接的解决方案。利用编辑距离算法进行模糊重复匹配。您可以从记录中选择要用于重复检测的字段。reducer将输出一个重复的分数。
https://pkghosh.wordpress.com/2013/09/09/identifying-duplicate-records-with-fuzzy-matching/