我想使用Java客户端代码在YARN集群上运行MapReduce应用程序。例如,我想使用JavaAPI将驻留在hadoop-examples.jar
文件中的WordCount提交给一个由16台机器组成的YARN集群。
我试着遵循这个教程,但我没有得到什么是应用程序主jar。它和hadoop-examples.jar
一样吗?或者另一个jar包含ApplicationMaster逻辑?
如果您有一个从a到z的Java客户端代码示例来向YARN提交MapReduce应用程序,我将不胜感激。
更新:
我感兴趣的是使用YarnAPI(例如YarnClient、ApplicationClientProtocol…)将MapReduce作业作为Yarn应用程序提交,这与这个问题不同。
不知道这对单词计数有多大帮助,但我已经构建了自己的个人映射器和还原器,可以处理R脚本。这里没有所有的复杂性,就是我如何提交一份简单的工作。
ArtisanJob只是一个扩展org.apache.hadoop.mapreducede.Job的类。它为我的功能提供了一些额外的方法。你可以用org.apache.hadoop.mapreduce.Job代替ArtisanJob,它对你来说应该很好。
我的ArtisanConfiguration扩展了导入org.apache.hoop.conf.Configuration,也可以只替换为导入org.apache.Hoop.conf.Configuration.
MetricInputFormat和MetricOutputFormat是相同的,它们是分别扩展InputFormat和OutputFormat的简单适配器。
如果您有任何问题,请告诉我,但这是使用mrv2的hadoop 2.4.1的工作代码。
public String execute(IHadoopJobConfiguration jobDetails)
throws HadoopJobException {
try {
ArtisanJob job = createJob(jobDetails);
job.submit();
return job.getJobID().toString();
} catch (ClassNotFoundException | IOException | InterruptedException
| RAnalyticsException | ConfigurationException e) {
logger.log(Level.SEVERE, "Unable to execute job", e);
throw new HadoopJobException("Unable to execute operation", e);
} catch (Exception e) {
throw new HadoopJobException("Unable to execute operation", e);
}
}
ArtisanJob createJob(IHadoopJobConfiguration details)
throws IOException, ConfigurationException, RAnalyticsException {
IOperation mapperOperation = details.getMapperOperation();
IOperation reducerOperation = details.getReducerOperation();
OperationConfiguration conf = new OperationConfiguration();
conf.setDataProviders(details.getDataProviders());
conf.setOperationInputs(details.getUserInputs());
ArtisanJob job = new ArtisanJob(new ArtisanConfiguration());
// Tell the job to be local for right now
job.getConfiguration().set("mapreduce.framework.name", "local");
job.setMapperClass(ROperationMapper.class);
job.setReducerClass(ROperationReducer.class);
job.setInputFormatClass(MetricInputFormat.class);
job.setOutputFormatClass(MetricOutputFormat.class);
job.setMapOutputKeyClass(MetricKey.class);
job.setMapOutputValueClass(MetricWritable.class);
job.setJarByClass(MetricInputFormat.class);
job.getConfiguration()
.set("conf.column",
props.getProperty("com.artisan.orchestrator.hbase.metric.colfamily"));
// Set the output type to hbase so that it will write the outputs to
// our hbase server
MetricOutputFormat.setOutputAdatperType(job.getConfiguration(),
OutputAdapterType.HBASE);
// Set the input to be the http service, this needs to be more modular.
MetricInputFormat.setInputAdapterType(job.getConfiguration(),
InputAdapterType.HTTP);
job.setMapperOperation(mapperOperation);
job.setReducerOperation(reducerOperation);
logger.log(Level.SEVERE, "Job class is " + job.getJar());
return job;
}