背景:
我正在尝试使用MapReduce在Hadoop上用Java制作一个"文档术语"矩阵。文档术语矩阵就像一个巨大的表格,每一行代表一个文档,每一列代表一个可能的单词/术语。
问题说明:
假设我已经有了一个术语索引列表(这样我就知道哪个术语与哪个列编号相关联),那么在每个文档中查找每个术语的索引的最佳方法是什么,这样我就可以逐行(即逐个文档)构建矩阵了?
到目前为止,我可以想到两种方法:
方法#1:
将术语索引列表存储在Hadoop分布式文件系统上。每次映射器读取新文档进行索引时,都会生成一个新的MapReduce作业——文档中每个唯一单词对应一个作业,每个作业都会在分布式术语列表中查询其术语。这种方法听起来有些过头了,因为我猜开始一份新工作会带来一些开销,而且这种方法可能需要数千万个工作岗位。此外,我不确定是否可以在另一个MapReduce作业中调用MapReduce。
方法#2:
将术语索引列表附加到每个文档中,以便每个映射器最终获得术语索引列表的本地副本。这种方法在存储方面相当浪费(术语索引列表的副本数量与文档数量一样多)。此外,我不知道如何将术语索引列表与每个文档合并——我是在映射器中还是在reducer中合并它们?
问题更新1
输入文件格式:
输入文件将是一个CSV(逗号分隔值)文件,包含所有文档(产品评论)。文件中没有列标题,但每个评论的值按以下顺序显示:product_id、review_id、review、stars。下面是一个假的例子:
"产品A","1","产品A非常非常昂贵。","2"
"产品G","2","真棒产品!!","5"
术语索引文件格式:
术语索引文件中的每一行都由以下内容组成:一个索引号、一个制表符,然后是一个单词。每个可能的单词在索引文件中只列出一次,因此术语索引文件类似于SQL表的主键(单词)列表。对于特定文档中的每个单词,我的初步计划是遍历术语索引文件的每一行,直到找到该单词。然后,该单词的列编号被定义为与该单词相关联的列/术语索引。以下是术语索引文件的示例,该文件是使用前面提到的两个示例产品评论构建的。
1个很棒的
2产品
3 a
4是
5非常
6昂贵的
输出文件格式:
我希望输出为"矩阵市场"(MM)格式,这是压缩具有多个零的矩阵的行业标准。这是理想的格式,因为大多数评论只包含所有可能单词的一小部分,所以对于特定的文档,只需要指定非零列。
MM格式的第一行有三个制表符分隔的值:文档总数、单词列总数和MM文件中不包括标题的总行数。在标题之后,每一行都包含与特定条目相关联的矩阵坐标,以及条目的值,顺序为:reviewID、wordColumnID、entry(该单词在评论中出现的次数)。有关矩阵市场格式的更多详细信息,请参阅此链接:http://math.nist.gov/MatrixMarket/formats.html.
每个评审的ID将等于文档术语矩阵中的行索引。这样,我就可以在矩阵市场格式中保留评论的ID,这样我仍然可以将每条评论与其星级联系起来。我的最终目标——这超出了这个问题的范围——是建立一种自然语言处理算法,根据文本预测新评论中的恒星数量。
使用上面的例子,最终的输出文件看起来是这样的(我不能让Stackoverflow显示制表符而不是空格):
2 6 7
1 2 1
1 3 1
1 4 1
1 5 2
1 6 1
2 1 1
2 2 1
好吧,您可以使用类似于反向索引概念的东西。
我建议这样做是因为,我认为这两个文件都很大。因此,像一对一那样相互比较将是真正的性能瓶颈。
这里有一种可以使用的方法-
您可以将输入文件格式csv文件(如datafile1、datafile2)和术语索引文件(如term_index_File)作为作业的输入。
然后在每个映射器中,您过滤源文件名,类似于以下内容-
映射器的伪代码-
map(key, row, context){
String filename= ((FileSplit)context.getInputSplit()).getPath().getName();
if (filename.startsWith("datafile") {
//split the review_id, words from row
....
context.write(new Text("word), new Text("-1 | review_id"));
} else if(filename.startsWith("term_index_file") {
//split index and word
....
context.write(new Text("word"), new Text("index | 0"));
}
}
例如来自不同映射器的输出
Key Value source
product -1|1 datafile
very 5|0 term_index_file
very -1|1 datafile
product -1|2 datafile
very -1|1 datafile
product 2|0 term_index_file
...
...
说明(示例):正如它清楚地显示的那样,关键字将是您的单词,值将由两部分组成,用分隔符分隔"|"
如果源是一个数据文件,则发出key=product和value=-1|1,其中-1是一个伪元素,1是review_id。
如果源是term_index_file,则发出key=product和value=2|0,其中2是单词'product'的索引,0是伪review_id,我们将使用它进行排序,稍后将对此进行解释。
毫无疑问,如果我们将term_index_file作为作业的正常输入文件提供,那么两个不同的映射器将不会处理重复索引。因此,'product,varie'或术语_index_file中的任何其他索引词将仅对一个映射器可用。请注意,这仅对term_index_file有效,对数据文件无效。
下一步:
Hadoop mapreduce框架,正如您可能知道的,将按键分组所以,你会有这样的东西去不同的减速器,
reduce-1: key=product, value=<-1|1, -1|2, 2|0>
reduce-2: key=very, value=<5|0, -1|1, -1|1>
但是,在上述情况下,我们有一个问题。我们希望在'|'之后的值中进行排序,即在reduce-1 -> 2|0, -1|1, -1|2 and in reduce-2 -> <5|0, -1|1, -1|1>
中
为了实现这一点,您可以使用使用排序比较器实现的辅助排序。请在谷歌上搜索,但这里有一个链接可能会有所帮助。在这里提及它可能会非常冗长
在每个reduce-1中,由于值如上所述排序,当我们开始迭代时,我们将在第一次迭代中获得"0",并获得index_id=2的值,然后可以用于后续迭代。在接下来的两次迭代中,我们连续获得评审ID 1和2,并使用计数器,这样我们就可以跟踪任何重复的评审ID。当我们得到重复的review_id时,这意味着一个单词在同一review_ids行中出现了两次。只有当我们找到一个不同的review_id并发出特定index_id的前一个review_id详细信息时,我们才会重置计数器,类似于以下内容-
previous_review_id + "t" + index_id + "t" + count
当循环结束时,我们将剩下一个previous_review_id,我们最终以相同的方式发出它。
减缩器的伪代码-
reduce(key, Iterable values, context) {
String index_id = null;
count = 1;
String previousReview_id = null;
for(value: values) {
Split split[] = values.split("\|");
....
//when consecutive review_ids are same, we increment count
//and as soon as the review_id differ, we emit, reset the counter and print
//the previous review_id detected.
if (split[0].equals("-1") && split[1].equals(previousReview_id)) {
count++;
} else if(split[0].equals("-1") && !split[1].equals(prevValue)) {
context.write(previousReview_id + "t" + index_id + "t" + count);
previousReview_id = split[1];//resting with new review_id id
count=1;//resetting count for new review_id
} else {
index_id = split[0];
}
}
//the last previousReview_id will be left out,
//so, writing it now after the loop completion
context.write(previousReview_id + "t" + index_id + "t" + count);
}
这项工作是用多个减速器完成的,目的是利用Hadoop实现其最著名的性能,因此,最终输出将分散,与您想要的输出不同,如下所示。
1 4 1
2 1 1
1 5 2
1 2 1
1 3 1
1 6 1
2 2 1
但是,如果您希望根据review_id(作为您想要的输出)对所有内容进行排序,则可以使用单个reductor和previos作业的输出作为输入,再编写一个作业来完成这项工作。同时计算2 6 7并将其放在输出的前面。
我认为,这只是一种可能对你有所帮助的方法(或想法)。你肯定想修改它,提出一个更好的算法,并以你认为有益的方式使用它。
与使用分隔符(如"|")相比,还可以使用复合键以获得更好的清晰度。
我愿意接受任何澄清。请询问你是否认为,这可能对你有用。
谢谢!
您可以在Hadoop分布式缓存中加载术语索引列表,以便映射器和还原器可以使用它。例如,在Hadoop流中,您可以按如下方式运行作业:
$ hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar
-input myInputDirs
-output myOutputDir
-mapper myMapper.py
-reducer myReducer.py
-file myMapper.py
-file myReducer.py
-file myTermIndexList.txt
现在,在myMapper.py中,您可以加载文件myTermIndexList.txt并将其用于您的目的。如果你对你的输入和期望的输出进行更详细的描述,我可以给你更多的细节。
如果您没有太多hadoop经验,方法#1不是很好,但非常常见。开始工作是非常昂贵的。你想做的是有2-3个工作,相互滋养,以获得想要的结果。类似问题的常见解决方案是让映射器将输入和输出对标记化,将它们分组在执行某种计算的reducer中,然后将其输入到作业2中。在作业2中的映射器中,您可以以某种方式反转数据,并在reducer中进行其他计算。
我强烈建议通过培训课程学习更多关于Hadoop的知识。有趣的是,Cloudera的开发课程有一个与您试图解决的问题非常相似的问题。或者,除了课程之外,我还会看"MapReduce的数据密集型文本处理",特别是"计算相对频率"one_answers"文本检索的反向索引"部分
http://lintool.github.io/MapReduceAlgorithms/MapReduce-book-final.pdf