我有大量摄入的设备数据,其中包含重复数据。我还有一个单独的摄入元数据历史记录列表(用于唯一标识摄入的文件)。我希望通过历史记录消除我摄入的设备数据的重复。这个历史文件不小,因此我不想在内存中加载它。我也考虑过Reduce侧连接,但这将通过网络传递大量数据。
Bloom Filter是我正在考虑的缩小历史文件大小的东西。但它给了我相反的结果,即它可能会报告我有一个副本,而我没有。
重复数据消除似乎是一个相当常见的问题,我想看看是否有其他人有可能的想法。
如果要使用Map/Reduce进行重复数据消除,并且要使用多台机器执行该任务,则必须通过网络发送所有数据。Hadoop就是这么做的!
当然,你也可以在一台机器上运行所有的东西,这只需要更长的时间。从核心来看,重复数据消除是Hadoop自然会做的事情之一,您可以免费获得大部分功能:Hadoop在Map步骤中对所有"密钥"进行散列,并确保属于"密钥"的所有"值"最终都在同一个Reducer上。
任务本身相当简单,实际上它几乎与WordCount示例(最简单的Map/Reduce作业之一)相同。只需跳过输出计数,只输出键(使用NullWritable作为值)。我在下面包含了map和reduce函数。注意:如果您使用N台多台机器作为Reducer,则需要将每个Reducer的N个输出文件连接起来,以返回一个文件。这是代码:
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString(); //process your data here
context.write(line, NullWritable.get());
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
编辑1:如果你想按照另一个答案的建议使用组合器,你可以很容易地做到。Combinater在数据通过网络发送之前运行,您可以将其视为本地Reducer。只需设置
job.setCombinerClass(Reduce.class);
其中,Reduce是包含Reduce()方法的类。
编辑2:根据我收到的一个建议:如果你只需要处理字符串,根本不需要进行任何处理,那么value.toString()
是多余的,也不需要。然后,您可以稍微简化Mapper
:
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
不要忘记,如果您有很多重复项,那么组合器是减少网络流量的最佳方法,足以使集群中的单个主机已经有很多重复。