我一直在尝试在Hadoop中使用MapReduce实现TfIdf算法。我的TFIDF分为4个步骤(我称它们为MR1、MR2、MR3、MR4(。以下是我的输入/输出:
MR1:(offset, line) ==(Map)==> (word|file, 1) ==(Reduce)==> (word|file, n)
MR2:(word|file, n) ==(Map)==> (file, word|n) ==(Reduce)==> (word|file, n|N)
MR3:(word|file, n|N) ==(Map)==> (word, file|n|N|1) ==(Reduce)==> (word|file, n|N|M)
MR4:(word|file, n|N|M) ==(Map)==> (word|file, n/N log D/M)
其中,n=(单词、文件(不同对的数量,n=每个文件中的单词数量,M=每个单词出现的文档数量,D=文档数量。
在MR1阶段,我得到了正确的输出,例如:hello|hdfs://..... 2
对于MR2阶段,我预计:hello|hdfs://....... 2|192
,但我得到的是2|hello|hdfs://...... 192|192
我很确定我的代码是正确的,每次我试图向我的";值";在reduce阶段查看发生了什么;传送的";在关键部分。
示例:gg|word|hdfs://.... gg|192
这是我的MR1代码:
public class MR1 {
/* Classe Map :
* Entree : (offset, line)
* Sortie : (word|file, 1)
* Sends 1 for each word per line.
*/
static class MR1Mapper extends Mapper <LongWritable, Text, Text, IntWritable > {
public void map (LongWritable key, Text value, Context contexte)
throws IOException, InterruptedException {
// Recuperation du nom du fichier associe au "split"
FileSplit split = (FileSplit) contexte.getInputSplit();
String fileName = split.getPath().toString();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, "' t:,;:!?./-_()[]{}"&%<>");
while (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken().toLowerCase();
contexte.write(new Text(word + "|" + fileName), new IntWritable(1));
}
}
}
/* Class Reducer : compte le nombre d'occurrence total par mot/fichier
* Entree : (word|file, x)
* Sortie : (word|file, n)
*/
public static class MR1Reducer extends Reducer <Text, IntWritable, Text, IntWritable > {
public void reduce(Text key, Iterable < IntWritable > values, Context contexte)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val:values) {
sum += val.get();
}
contexte.write(key, new IntWritable(sum));
}
}
public static void main(String args[]) throws Exception {
if (args.length != 2) {
System.err.println(args.length + "(" + args[0] + "," + args[1] + ")");
System.err. println("Usage : MR1 <source> <destination>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MR1.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MR1Mapper.class);
job.setCombinerClass (MR1Reducer.class) ;
job.setReducerClass(MR1Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
这是我的MR2代码:
public class MR2 {
/* Map : on isole le nom du fichier.
* Entree : (word|file, n)
* Sortie : (file, word|n)
*/
static class MR2Mapper extends Mapper <Text, Text, Text, Text> {
public void map (Text key, Text value, Context contexte)
throws IOException, InterruptedException {
String skey = key.toString () ;
String word = skey.substring (0, skey.indexOf ("|")) ;
String fileName = skey.substring (skey.indexOf ("|")+1) ;
contexte.write (new Text (fileName), new Text (word + "|" + value)) ;
}
}
/* Reduce : on somme le nombre d'occurence de chaque mot du fichier
* Entree : (file, word|n)
* Sortie : (word|file, n|N)
*/
public static class MR2Reducer extends Reducer <Text, Text, Text, Text> {
public void reduce (Text key, Iterable <Text> values, Context contexte)
throws IOException, InterruptedException {
int N = 0 ;
// les iterateurs sont utilisable qu'une seule fois. Donc il faut
// garder les valeurs dans une arraylist pour les reparcourir.
ArrayList <String> altmp = new ArrayList <String> () ;
// 1ere boucle : calcul de la somme totale des mots
for (Text val : values) {
String sval = val.toString () ;
String sn = sval.substring (sval.indexOf ("|")+1) ;
int n = Integer.parseInt (sn) ;
altmp.add (val.toString ()) ;
N += n ;
}
// 2eme boucle : calcul de la somme totale des mots
Iterator <String> it = altmp.iterator () ;
while (it.hasNext ()) {
String val = it.next () ;
String sval = val.toString () ;
String word = sval.substring (0, sval.indexOf ("|")) ;
String sn = sval.substring (sval.indexOf ("|")+1) ;
int n = Integer.parseInt (sn) ;
// I tried to replace n with "gg" here, still same teleporting issue
contexte.write (new Text (word + "|" + clef.toString ()), new Text (n + "|" + N)) ;
}
}
}
public static void main (String args []) throws Exception {
if (args.length != 2) {
System.err.println (args.length + "("+args [0] + "," +args [1] + ")") ;
System.err.println ("Usage : MR2 <source> <destination>") ;
System.exit (-1) ;
}
Job job = new Job () ;
job.setJarByClass (MR2.class) ;
// Le fichier HDFS a utiliser en entree
FileInputFormat.addInputPath (job, new Path (args [0])) ;
FileOutputFormat.setOutputPath (job, new Path (args [1])) ;
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapperClass (MR2Mapper.class) ;
job.setCombinerClass (MR2Reducer.class) ;
job.setReducerClass (MR2Reducer.class) ;
job.setMapOutputKeyClass (Text.class) ;
job.setMapOutputValueClass (Text.class) ;
job.setOutputKeyClass (Text.class) ;
job.setOutputValueClass (Text.class) ;
System.exit (job.waitForCompletion (true) ? 0 : 1) ;
}
}
如有任何帮助,我们将不胜感激。
这是组合器的故障。您在驱动程序类中指定要在以下命令中将MR2Reducer
同时用作Combiner和Reducer:
job.setCombinerClass (MR2Reducer.class) ;
job.setReducerClass (MR2Reducer.class) ;
但是,Combinater在Map
实例的范围内运行,而Reducer在执行所有Mapper之后串联运行。通过使用Combiner,您实际上是在执行每个单独的Mapper任务之后立即执行MR2Reducer
,因此它计算N
,并拆分每个Mapper任务范围内输入的给定键值的组合值。
这基本上导致Reduce
阶段通过输入(word|file, n|N)
键值对模式(也称为Reduce
阶段之前的MR2Reducer
任务的输出(而不是期望的(file, word|n)
模式而开始。通过在不知情的情况下使用错误的模式,您错误地分割了复合值,并且输出键值对看起来不稳定、错误和/或相反。
要解决此问题,您可以:
- 创建一个自定义Combinater,它将具有与当前
MR2Reducer
相同的命令,然后更改MR2Reducer
类以接收(word|file, n|N)
模式中的键值对(不推荐,因为这可能会抵消可伸缩性和执行时间方面的所有好处,只会使MapReduce作业变得更加复杂(,或者 - 从您的驱动程序类中删除或注释掉
job.setCombinerClass (MR2Reducer.class) ;
行,以使事情简单实用,这样您将来就可以从中构建
为了展示这一点,我在机器上本地使用了您的MR1
、MR2
类,删除了job.setCombinerClass (MR2Reducer.class) ;
行,并使用存储在HDFS中的此输入来验证输出键值对是否符合要求。以下是执行后的输出片段:
balance|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
suppress|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
back|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 4|661
after|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661
suspicious|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 2|661
swang|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 2|661
swinging|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 1|661