使用Java客户端在Apache YARN上运行MapReduce应用程序



我想使用Java客户端代码在YARN集群上运行MapReduce应用程序。例如,我想使用JavaAPI将驻留在hadoop-examples.jar文件中的WordCount提交给一个由16台机器组成的YARN集群。

我试着遵循这个教程,但我没有得到什么是应用程序主jar。它和hadoop-examples.jar一样吗?或者另一个jar包含ApplicationMaster逻辑?

如果您有一个从a到z的Java客户端代码示例来向YARN提交MapReduce应用程序,我将不胜感激。

更新:

我感兴趣的是使用YarnAPI(例如YarnClient、ApplicationClientProtocol…)将MapReduce作业作为Yarn应用程序提交,这与这个问题不同。

不知道这对单词计数有多大帮助,但我已经构建了自己的个人映射器和还原器,可以处理R脚本。这里没有所有的复杂性,就是我如何提交一份简单的工作。

ArtisanJob只是一个扩展org.apache.hadoop.mapreducede.Job的类。它为我的功能提供了一些额外的方法。你可以用org.apache.hadoop.mapreduce.Job代替ArtisanJob,它对你来说应该很好。

我的ArtisanConfiguration扩展了导入org.apache.hoop.conf.Configuration,也可以只替换为导入org.apache.Hoop.conf.Configuration.

MetricInputFormat和MetricOutputFormat是相同的,它们是分别扩展InputFormat和OutputFormat的简单适配器。

如果您有任何问题,请告诉我,但这是使用mrv2的hadoop 2.4.1的工作代码。

public String execute(IHadoopJobConfiguration jobDetails)
        throws HadoopJobException {
    try {
        ArtisanJob job = createJob(jobDetails);
        job.submit();
        return job.getJobID().toString();
    } catch (ClassNotFoundException | IOException | InterruptedException
            | RAnalyticsException | ConfigurationException e) {
        logger.log(Level.SEVERE, "Unable to execute job", e);
        throw new HadoopJobException("Unable to execute operation", e);
    } catch (Exception e) {
        throw new HadoopJobException("Unable to execute operation", e);
    }
}

ArtisanJob createJob(IHadoopJobConfiguration details)
        throws IOException, ConfigurationException, RAnalyticsException {
    IOperation mapperOperation =  details.getMapperOperation();
    IOperation reducerOperation = details.getReducerOperation();
    OperationConfiguration conf = new OperationConfiguration();
    conf.setDataProviders(details.getDataProviders());
    conf.setOperationInputs(details.getUserInputs());
    ArtisanJob job = new ArtisanJob(new ArtisanConfiguration());
    // Tell the job to be local for right now
    job.getConfiguration().set("mapreduce.framework.name", "local");
    job.setMapperClass(ROperationMapper.class);
    job.setReducerClass(ROperationReducer.class);
    job.setInputFormatClass(MetricInputFormat.class);
    job.setOutputFormatClass(MetricOutputFormat.class);
    job.setMapOutputKeyClass(MetricKey.class);
    job.setMapOutputValueClass(MetricWritable.class);
    job.setJarByClass(MetricInputFormat.class);
    job.getConfiguration()
            .set("conf.column",
                    props.getProperty("com.artisan.orchestrator.hbase.metric.colfamily"));
    // Set the output type to hbase so that it will write the outputs to 
    // our hbase server
    MetricOutputFormat.setOutputAdatperType(job.getConfiguration(),
            OutputAdapterType.HBASE);
    // Set the input to be the http service, this needs to be more modular.
    MetricInputFormat.setInputAdapterType(job.getConfiguration(),
            InputAdapterType.HTTP);
    job.setMapperOperation(mapperOperation);
    job.setReducerOperation(reducerOperation);

    logger.log(Level.SEVERE, "Job class is " + job.getJar());
    return job;
}

相关内容

  • 没有找到相关文章

最新更新