我是Hadoop的新手,我想在每次运行时使用2,4,6个节点来分割要发送给映射器的数据集。但是我写的代码不能正常工作。事实上,它适用于2个节点,但随着节点数量的增加,一些输出数据会在输出文件中丢失。你能帮帮我吗?谢谢你
代码如下:
public static void main(String[] args) throws Exception {
System.out.println("MapReduce Started at:"+System.currentTimeMillis());
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
int numOfNodes = 2;
Job job = new Job(conf, "calculateSAAM");
job.setJarByClass(calculateSAAM.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("/home/helsene/wordcount/input"));
String outputFile = "/home/helsene/wordcount/output/";
long dataLength = fs.getContentSummary(new Path(outputFile)).getLength();
FileInputFormat.setMaxInputSplitSize(job, (dataLength / numOfNodes));
job.setNumReduceTasks(numOfNodes/2);
Path outPath = new Path(outputFile);
fs.delete(outPath, true);
FileOutputFormat.setOutputPath(job, new Path(outputFile));
job.waitForCompletion(true);
System.out.println("MapReduce ends at:"+System.currentTimeMillis());
}
}
每个reducer产生一个输出文件,默认命名为part-xxxxx
(part-00000
为第一个reducer, part-00001
为第二个reducer,等等)。
对于您的代码,当您有超过3个节点时,您将有多个reducer,因此输出数据将被分割成多个部分(多于一个文件)。这意味着一些单词计数将出现在第一个文件(part-00000)中,一些单词计数将出现在第二个文件(part-00001)中,等等。以后可以通过调用getmerge命令合并这些部分,如:
hadoop dfs -getmerge /HADOOP/OUTPUT/PATH /local/path/
并在指定的本地路径中获得一个文件,其中包含所有部分文件的合并结果。这个文件的结果将与您有两个节点时得到的文件相同,因此2/2 = 1个reducer(产生一个输出文件)。
顺便说一下,将还原剂的数量设置为numOfNodes/2
可能不是最佳选择。