我有一个包含2个作业的MapReduce
应用程序
我需要在Job1和Job2之间运行一个小代码。换句话说,在Job1的最终输出上运行一个小代码,Job2中的所有映射器都可以使用这个小代码的输出。
这个小代码不需要并行运行。它是一个顺序代码,应该在一台机器上运行,并在HDFS中写入输出。
我想知道如何在Job1和Job2之间的应用程序代码中编写一个顺序代码,该代码将在一台机器上运行,并从HDFS读取Job 1
在Driver(main)类中,在执行Job1(通常是JobClient.runJob(conf);
命令)之后,在设置Job2的第一个命令之前,使用hadoop的FileSystem
来获取Job 1ob2的输入,如下所示:
//...run job1
BufferedWriter bw = null;
BufferedReader br = null;
try {
String job1outPath = "/job1/output/";
Path job2in = new Path("/job2/in/oneFile.txt");
bw = new BufferedWriter(new OutputStreamWriter(fs.create(job2in,true)));
FileSystem fs = FileSystem.get(new Configuration());
FileStatus[] status = fs.listStatus(new Path(job1outPath));
for (int i=0;i<status.length;i++){
br=new BufferedReader(new InputStreamReader(fs.open(status[i].getPath())));
String line;
while ((line = br.readLine())!= null){
bw.write("whatever"); //process the output of job1 as you wish
bw.newLine();
}
}
} catch (IOException e) {
System.err.println(e);
} finally {
try{
bw.close();
br.close(); //not sure if you need this inside the for loop
} catch (IOException io) {
System.out.println(io);
}
}
//optionally delete "/job1/output/*" here
//...run job2 using "/job2/in" as your input path
这将在主节点中按顺序运行。如果您愿意,也可以使用System.out.println()
命令在您的终端中查看有用的信息。
希望它能编译,我没有通过编辑,但我相信你能让它发挥作用。