链接多个hadoop作业并提交,无需等待



我试图找出如何链接多个hadoop作业,与一个步骤的输出馈送到下一步的输入。我在谷歌上找到的很多东西都说,我应该从一个线程中一次调用它们并等待完成,或者我应该使用Job.addDependingJob(),然后提交它们。我选择了后者,但是在前一个任务完成后,我似乎无法让后续的任务执行。

下面是我的代码:

List<Job> jobs = new ArrayList<Job>();
for(int i = 0; i < stepCount; i++) {
    JobConf jc = new JobConf(clusterConfig);
    ... set up mappers and reducers here ...
    ... set up input and output paths here ...
    Job j = new Job(jc);
    j.addDependingJob(jobs.get(jobs.size() - 1);
    jobs.add(j);
}
for(Job j : Jobs) {
    JobClient client = new JobClient();
    client.init(j.getJobConf());
    client.submit(j.getJobConf());
}

所有的作业同时运行,我得到这样的输出:

  • 没有作业jar文件集。可能找不到用户类。参见JobConf(Class)或JobConf#setJar(String)。
  • 过程输入路径总数:1
  • 使用GenericOptionsParser解析参数应用程序应该实现相同的工具。
  • 没有作业jar文件集。可能找不到用户类。参见JobConf(Class)或JobConf#setJar(String)。
  • 到进程的总输入路径:0
  • 使用GenericOptionsParser解析参数应用程序应该实现相同的工具。
  • 没有作业jar文件集。可能找不到用户类。参见JobConf(Class)或JobConf#setJar(String)。
  • 到进程的总输入路径:0
  • 使用GenericOptionsParser解析参数应用程序应该实现相同的工具。
  • 没有作业jar文件集。可能找不到用户类。参见JobConf(Class)或JobConf#setJar(String)。
  • 到进程的总输入路径:0

我做错了什么?

注意:我使用Hadoop 0.20.205

编辑澄清:我需要能够向集群提交作业链,然后立即返回,而无需等待作业链完成。

JobControl应该用来设置作业之间的依赖关系。在给定的代码中没有设置依赖项,因此作业是并行运行的,而不是按顺序运行的。如果有更复杂的工作流程,那么可以使用Oozie。

我已经处理这个问题好几年了,但是我看到了一些事情:

    你的错误与作业之间的链接没有任何关系。在担心链接它们之前,请确保您可以获得单个作业来运行。
  1. 作业控制不(或至少没有在2010年)提交作业的顺序到作业跟踪器,它只是一个工具来处理检查,当上游作业已经完成,并自动提交下一个作业跟踪器。
  2. 你不应该在作业上调用submit。这就迫使他们逃跑。你应该把控制权交给作业控制。

我认为这是令人困惑的,并开始在https://github.com/kevinpet/jobcontrol中编写我自己的DAG帮助程序,您可能会或可能不会发现有用。

以下是链接map reduce作业的方法。这里我们在第一个作业的输出上运行第二个作业:

        Job job1 = new Job(conf, "job1");
    Job job2 = new Job(conf,"job2");
    job1.setJarByClass(driver.class);
    job1.setMapperClass(Map.class);
    job1.setCombinerClass(Reduce.class);
    job1.setReducerClass(Reduce.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);
    String outputpath="/user/hadoop/firstjoboutput";
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(outputpath));
    job1.waitForCompletion(true);
    job2.setJarByClass(driver.class);
    job2.setMapperClass(SecondMap.class);
    job2.setReducerClass(SecondReducer.class);
    job2.setMapOutputKeyClass(IntWritable.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job2, new Path(outputpath));
    String finaloutput="/user/hadoop/finaloutput";
    FileOutputFormat.setOutputPath(job2, new Path(finaloutput));

    System.exit(job2.waitForCompletion(true) ? 0 : 1);

相关内容

  • 没有找到相关文章

最新更新