我正在研究类似于经典MapReduce示例的单词计数,但有一点不同,我希望只得到Top N结果。
假设我在HDFS中有一组非常大的文本数据。有很多例子展示了如何构建HadoopMapReduce作业,该作业将为文本中的每个单词提供字数统计。例如,如果我的语料库是:
"这是对测试数据的测试,也是测试的好方法">
标准MapReduce字数作业的结果集为:
test:3,a:2,this:2,is:1等。
但是如果我ONLY想要获得在我的整个数据集中使用的前三个单词呢?
我仍然可以运行完全相同的标准MapReduce单词计数作业,然后在它准备好并吐出每个单词的计数后,只获取前3个结果,但这似乎有点低效,因为在洗牌阶段需要移动大量数据。
我想的是,如果这个样本足够大,并且数据在HDFS中随机且分布良好,那么每个Mapper不需要将其所有字数发送给Reducer,而是只需要发送一些顶部数据。因此,如果一个映射器有这个:
a:8234,the:5422,man:4352更多的单词,稀有词:1,怪异词:1等
然后我想做的是只将每个Mapper的前100个左右的单词发送到Reducer阶段,因为当一切都说了又做了之后,"稀有单词"突然进入前3名的可能性很小。这似乎可以节省带宽和Reducer处理时间。
这可以在组合器阶段完成吗?这种在混洗阶段之前的优化通常会完成吗?
这是一个非常好的问题,因为您已经发现Hadoop的字数示例效率低下。
优化问题的技巧如下:
在本地映射阶段进行基于HashMap
的分组,也可以使用组合器。这可能是这样的,我使用的是番石榴的HashMultiSet
,它提供了一个很好的计数机制。
public static class WordFrequencyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private final HashMultiset<String> wordCountSet = HashMultiset.create();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split("\s+");
for (String token : tokens) {
wordCountSet.add(token);
}
}
你在清理阶段会发出结果:
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Text key = new Text();
LongWritable value = new LongWritable();
for (Entry<String> entry : wordCountSet.entrySet()) {
key.set(entry.getElement());
value.set(entry.getCount());
context.write(key, value);
}
}
因此,您将单词分组到本地工作块中,从而通过使用一点RAM来减少网络使用。您也可以对Combiner
执行同样的操作,但它是按组排序的,因此这将比使用HashMultiset
慢(尤其是对于字符串!)。
要获得Top N,您只需要将本地HashMultiset
中的Top N写入输出收集器,并在reduce端以正常方式聚合结果。这也为您节省了大量的网络带宽,唯一的缺点是您需要在清理方法中对单词计数元组进行排序。
代码的一部分可能看起来像这样:
Set<String> elementSet = wordCountSet.elementSet();
String[] array = elementSet.toArray(new String[elementSet.size()]);
Arrays.sort(array, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// sort descending
return Long.compare(wordCountSet.count(o2), wordCountSet.count(o1));
}
});
Text key = new Text();
LongWritable value = new LongWritable();
// just emit the first n records
for(int i = 0; i < N, i++){
key.set(array[i]);
value.set(wordCountSet.count(array[i]));
context.write(key, value);
}
希望你能理解在本地做尽可能多的单词的要点,然后只聚合前N个单词中的前N个;)
引用Thomas
要获得前N名,您只需要将前N名写入本地HashMultiset到输出收集器并聚合结果以你正常的方式减少。这为您节省了大量网络带宽,唯一的缺点是您需要对清理方法中的字数元组。
如果您只在本地HashMultiset中写入前N个元素,那么您可能会错过某个元素的计数,如果从该本地HashMultiset传递,该元素可能会成为前10个元素之一。
例如,将以下格式视为三个映射:MapName:elementName,elementcount:
地图A:Ele1,4:Ele2,5:Ele3,5:Ele4,2
地图B:Ele1,1:Ele5,7:Ele6,3:Ele7,6
地图C:Ele5,4:Ele8,3:Ele1,1:Ele9,3
现在,如果我们考虑每个映射器的前3个,我们将错过元素"Ele1",该元素的总数本应为6,但由于我们正在计算每个映射器前3个的总数,我们看到"Ele1"的总数为4。
我希望这是有道理的。请告诉我你对此有何看法。