MapReduce中的怪异行为会覆盖值



我一直在尝试在Hadoop中使用MapReduce实现TfIdf算法。我的TFIDF分为4个步骤(我称它们为MR1、MR2、MR3、MR4(。以下是我的输入/输出:

MR1:(offset, line) ==(Map)==> (word|file, 1) ==(Reduce)==> (word|file, n)

MR2:(word|file, n) ==(Map)==> (file, word|n) ==(Reduce)==> (word|file, n|N)

MR3:(word|file, n|N) ==(Map)==> (word, file|n|N|1) ==(Reduce)==> (word|file, n|N|M)

MR4:(word|file, n|N|M) ==(Map)==> (word|file, n/N log D/M)

其中,n=(单词、文件(不同对的数量,n=每个文件中的单词数量,M=每个单词出现的文档数量,D=文档数量。

在MR1阶段,我得到了正确的输出,例如:hello|hdfs://..... 2

对于MR2阶段,我预计:hello|hdfs://....... 2|192,但我得到的是2|hello|hdfs://...... 192|192

我很确定我的代码是正确的,每次我试图向我的";值";在reduce阶段查看发生了什么;传送的";在关键部分。

示例:gg|word|hdfs://.... gg|192

这是我的MR1代码:

public class MR1 {
/* Classe Map :
*  Entree : (offset, line)
*  Sortie : (word|file, 1)
*  Sends 1 for each word per line.
*/
static class MR1Mapper extends Mapper <LongWritable, Text, Text, IntWritable > {
public void map (LongWritable key, Text value, Context contexte)
throws IOException, InterruptedException {
// Recuperation du nom du fichier associe au "split" 
FileSplit split = (FileSplit) contexte.getInputSplit();
String fileName = split.getPath().toString();

String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, "' t:,;:!?./-_()[]{}"&%<>");
while (tokenizer.hasMoreTokens()) {
String word = tokenizer.nextToken().toLowerCase();
contexte.write(new Text(word + "|" + fileName), new IntWritable(1));
}
}
} 

/* Class Reducer : compte le nombre d'occurrence total par mot/fichier
* Entree : (word|file, x)
* Sortie : (word|file, n)
*/
public static class MR1Reducer extends Reducer <Text, IntWritable, Text, IntWritable > {
public void reduce(Text key, Iterable < IntWritable > values, Context contexte) 
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val:values) {
sum += val.get();
} 
contexte.write(key, new IntWritable(sum));
}
} 

public static void main(String args[]) throws Exception {
if (args.length != 2) {
System.err.println(args.length + "(" + args[0] + "," + args[1] + ")");
System.err.  println("Usage : MR1 <source> <destination>");
System.exit(-1);
}

Job job = new Job();
job.setJarByClass(MR1.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MR1Mapper.class);
job.setCombinerClass (MR1Reducer.class) ;
job.setReducerClass(MR1Reducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

这是我的MR2代码:

public class MR2 {
/* Map : on isole le nom du fichier.
* Entree : (word|file, n)
* Sortie : (file, word|n)
*/
static class MR2Mapper extends Mapper <Text, Text, Text, Text> {
public void map (Text key, Text value, Context contexte) 
throws IOException, InterruptedException {
String skey = key.toString () ;
String word = skey.substring (0, skey.indexOf ("|")) ;
String fileName = skey.substring (skey.indexOf ("|")+1) ;
contexte.write (new Text (fileName), new Text (word + "|" + value)) ;
}
}

/* Reduce : on somme le nombre d'occurence de chaque mot du fichier
* Entree : (file, word|n)
* Sortie : (word|file, n|N)
*/
public static class MR2Reducer extends Reducer <Text, Text, Text, Text> {
public void reduce (Text key, Iterable <Text> values, Context contexte) 
throws IOException, InterruptedException {
int N = 0 ;

// les iterateurs sont utilisable qu'une seule fois. Donc il faut
// garder les valeurs dans une arraylist pour les reparcourir.
ArrayList <String> altmp = new ArrayList <String> () ;

// 1ere boucle : calcul de la somme totale des mots
for (Text val : values) {
String sval = val.toString () ;
String sn = sval.substring (sval.indexOf ("|")+1) ;
int n = Integer.parseInt (sn) ;

altmp.add (val.toString ()) ;
N += n ;
}

// 2eme boucle : calcul de la somme totale des mots
Iterator <String> it = altmp.iterator () ;
while (it.hasNext ()) {
String val = it.next () ;
String sval = val.toString () ;
String word = sval.substring (0, sval.indexOf ("|")) ;
String sn = sval.substring (sval.indexOf ("|")+1) ;
int n = Integer.parseInt (sn) ;

// I tried to replace n with "gg" here, still same teleporting issue
contexte.write (new Text (word + "|" + clef.toString ()), new Text (n + "|" + N)) ; 
}
}
}

public static void main (String args []) throws Exception {
if (args.length != 2) {
System.err.println (args.length + "("+args [0] + "," +args [1] + ")") ;
System.err.println ("Usage : MR2 <source> <destination>") ;
System.exit (-1) ;
}

Job job = new Job () ;
job.setJarByClass (MR2.class) ;

// Le fichier HDFS a utiliser en entree
FileInputFormat.addInputPath (job, new Path (args [0])) ;
FileOutputFormat.setOutputPath (job, new Path (args [1])) ;

job.setInputFormatClass(KeyValueTextInputFormat.class);

job.setMapperClass (MR2Mapper.class) ;
job.setCombinerClass (MR2Reducer.class) ;
job.setReducerClass (MR2Reducer.class) ;

job.setMapOutputKeyClass (Text.class) ;
job.setMapOutputValueClass (Text.class) ;

job.setOutputKeyClass (Text.class) ;
job.setOutputValueClass (Text.class) ;

System.exit (job.waitForCompletion (true) ? 0 : 1) ;
}
}

如有任何帮助,我们将不胜感激。

这是组合器的故障。您在驱动程序类中指定要在以下命令中将MR2Reducer同时用作Combiner和Reducer:

job.setCombinerClass (MR2Reducer.class) ;
job.setReducerClass (MR2Reducer.class) ;

但是,Combinater在Map实例的范围内运行,而Reducer在执行所有Mapper之后串联运行。通过使用Combiner,您实际上是在执行每个单独的Mapper任务之后立即执行MR2Reducer,因此它计算N,并拆分每个Mapper任务范围内输入的给定键值的组合值。

这基本上导致Reduce阶段通过输入(word|file, n|N)键值对模式(也称为Reduce阶段之前的MR2Reducer任务的输出(而不是期望的(file, word|n)模式而开始。通过在不知情的情况下使用错误的模式,您错误地分割了复合值,并且输出键值对看起来不稳定、错误和/或相反。

要解决此问题,您可以:

  • 创建一个自定义Combinater,它将具有与当前MR2Reducer相同的命令,然后更改MR2Reducer类以接收(word|file, n|N)模式中的键值对(不推荐,因为这可能会抵消可伸缩性和执行时间方面的所有好处,只会使MapReduce作业变得更加复杂(,或者
  • 从您的驱动程序类中删除或注释掉job.setCombinerClass (MR2Reducer.class) ;行,以使事情简单实用,这样您将来就可以从中构建

为了展示这一点,我在机器上本地使用了您的MR1MR2类,删除了job.setCombinerClass (MR2Reducer.class) ;行,并使用存储在HDFS中的此输入来验证输出键值对是否符合要求。以下是执行后的输出片段:

balance|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt    1|661
suppress|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt   1|661
back|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt       4|661
after|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt      1|661
suspicious|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt 2|661
swang|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt      2|661
swinging|hdfs://localhost:9000/user/crsl/metamorphosis/05.txt   1|661

相关内容

  • 没有找到相关文章

最新更新