我试图找出如何链接多个hadoop作业,与一个步骤的输出馈送到下一步的输入。我在谷歌上找到的很多东西都说,我应该从一个线程中一次调用它们并等待完成,或者我应该使用Job.addDependingJob(),然后提交它们。我选择了后者,但是在前一个任务完成后,我似乎无法让后续的任务执行。
下面是我的代码:
List<Job> jobs = new ArrayList<Job>();
for(int i = 0; i < stepCount; i++) {
JobConf jc = new JobConf(clusterConfig);
... set up mappers and reducers here ...
... set up input and output paths here ...
Job j = new Job(jc);
j.addDependingJob(jobs.get(jobs.size() - 1);
jobs.add(j);
}
for(Job j : Jobs) {
JobClient client = new JobClient();
client.init(j.getJobConf());
client.submit(j.getJobConf());
}
所有的作业同时运行,我得到这样的输出:
- 没有作业jar文件集。可能找不到用户类。参见JobConf(Class)或JobConf#setJar(String)。
- 过程输入路径总数:1
- 使用GenericOptionsParser解析参数应用程序应该实现相同的工具。
- 没有作业jar文件集。可能找不到用户类。参见JobConf(Class)或JobConf#setJar(String)。
- 到进程的总输入路径:0
- 使用GenericOptionsParser解析参数应用程序应该实现相同的工具。
- 没有作业jar文件集。可能找不到用户类。参见JobConf(Class)或JobConf#setJar(String)。
- 到进程的总输入路径:0
- 使用GenericOptionsParser解析参数应用程序应该实现相同的工具。
- 没有作业jar文件集。可能找不到用户类。参见JobConf(Class)或JobConf#setJar(String)。
- 到进程的总输入路径:0
我做错了什么?
注意:我使用Hadoop 0.20.205
编辑澄清:我需要能够向集群提交作业链,然后立即返回,而无需等待作业链完成。
JobControl应该用来设置作业之间的依赖关系。在给定的代码中没有设置依赖项,因此作业是并行运行的,而不是按顺序运行的。如果有更复杂的工作流程,那么可以使用Oozie。
我已经处理这个问题好几年了,但是我看到了一些事情:
- 你的错误与作业之间的链接没有任何关系。在担心链接它们之前,请确保您可以获得单个作业来运行。
- 作业控制不(或至少没有在2010年)提交作业的顺序到作业跟踪器,它只是一个工具来处理检查,当上游作业已经完成,并自动提交下一个作业跟踪器。
- 你不应该在作业上调用submit。这就迫使他们逃跑。你应该把控制权交给作业控制。
我认为这是令人困惑的,并开始在https://github.com/kevinpet/jobcontrol中编写我自己的DAG帮助程序,您可能会或可能不会发现有用。
以下是链接map reduce作业的方法。这里我们在第一个作业的输出上运行第二个作业:
Job job1 = new Job(conf, "job1");
Job job2 = new Job(conf,"job2");
job1.setJarByClass(driver.class);
job1.setMapperClass(Map.class);
job1.setCombinerClass(Reduce.class);
job1.setReducerClass(Reduce.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
String outputpath="/user/hadoop/firstjoboutput";
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(outputpath));
job1.waitForCompletion(true);
job2.setJarByClass(driver.class);
job2.setMapperClass(SecondMap.class);
job2.setReducerClass(SecondReducer.class);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job2, new Path(outputpath));
String finaloutput="/user/hadoop/finaloutput";
FileOutputFormat.setOutputPath(job2, new Path(finaloutput));
System.exit(job2.waitForCompletion(true) ? 0 : 1);