我怎样才能有多个映射器和化简器

  • 本文关键字:映射 hadoop mapreduce
  • 更新时间 :
  • 英文 :


我有这段代码,其中设置了一个映射器和一个化简器。我想再包括一个映射器和一个化简器,用于执行进一步的工作。问题是我必须将第一个地图化简作业的输出文件作为下一个地图化简作业的输入。有可能吗?如果是,那么我该怎么做?

public int run(String[] args) throws Exception 
          {
            JobConf conf = new JobConf(getConf(),DecisionTreec45.class);
            conf.setJobName("c4.5");
            // the keys are words (strings)
            conf.setOutputKeyClass(Text.class);
            // the values are counts (ints)
            conf.setOutputValueClass(IntWritable.class);
            conf.setMapperClass(MyMapper.class);
            conf.setReducerClass(MyReducer.class);

            //set your input file path below
            FileInputFormat.setInputPaths(conf, "/home/hduser/Id3_hds/playtennis.txt");
            FileOutputFormat.setOutputPath(conf, new Path("/home/hduser/Id3_hds/1/output"+current_index));
            JobClient.runJob(conf);
            return 0;
          }

是的,可以做到这一点。 您可以查看以下教程以了解链接是如何发生的。 http://gandhigeet.blogspot.com/2012/12/as-discussed-in-previous-post-hadoop.html

确保使用 HDFS 中删除每个 MR 阶段创建的中间输出数据fs.delete(intermediateoutputPath);

看看这是如何工作的。

你需要有两份工作。作业 2 依赖于作业 1。

public class ChainJobs extends Configured implements Tool {
 private static final String OUTPUT_PATH = "intermediate_output";
 @Override
 public int run(String[] args) throws Exception {
  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs.class);
  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
  job.waitForCompletion(true); /*this goes to next command after this job is completed. your second job is dependent on your first job.*/

  /*
   * Job 2
   */
  Configuration conf2 = getConf();
  Job job2 = new Job(conf2, "Job 2");
  job2.setJarByClass(ChainJobs.class);
  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);
  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);
  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);
  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));
  return job2.waitForCompletion(true) ? 0 : 1;
 }
 /**
  * Method Name: main Return type: none Purpose:Read the arguments from
  * command line and run the Job till completion
  * 
  */
 public static void main(String[] args) throws Exception {
  // TODO Auto-generated method stub
  if (args.length != 2) {
   System.err.println("Enter valid number of arguments <Inputdirectory>  <Outputlocation>");
   System.exit(0);
  }
  ToolRunner.run(new Configuration(), new ChainJobs(), args);
 }
}

相关内容

  • 没有找到相关文章

最新更新