如何在Java中使用Yarn API提交MapReduce Job



我想使用yarn java api提交我的MR Job,我尝试像写入yarnapplications一样做,但是我不知道该添加什么添加amcontainer,以下是我编写的代码:

package org.apache.hadoop.examples;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.util.Records;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class YarnJob {
    private static Logger logger = LoggerFactory.getLogger(YarnJob.class);
    public static void main(String[] args) throws Throwable {
        Configuration conf = new Configuration();
        YarnClient client = YarnClient.createYarnClient();
        client.init(conf);
        client.start();
        System.out.println(JSON.toString(client.getAllQueues()));
        System.out.println(JSON.toString(client.getConfig()));
        //System.out.println(JSON.toString(client.getApplications()));
        System.out.println(JSON.toString(client.getYarnClusterMetrics()));
        YarnClientApplication app = client.createApplication();
        GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
        ApplicationId appId = appResponse.getApplicationId();
        // Create launch context for app master
        ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
        // set the application id
        appContext.setApplicationId(appId);
        // set the application name
        appContext.setApplicationName("test");
        // Set the queue to which this application is to be submitted in the RM
        appContext.setQueue("default");
        // Set up the container launch context for the application master
        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
        //amContainer.setLocalResources();
        //amContainer.setCommands();
        //amContainer.setEnvironment();
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(Resource.newInstance(1024, 1));
        appContext.setApplicationType("MAPREDUCE");
        // Submit the application to the applications manager
        client.submitApplication(appContext);
        //client.stop();
    }
}

我可以使用命令接口正确运行MapReduce作业:

hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount /user/admin/input /user/admin/output/

但是如何在Yarn Java API中提交此WordCount作业?

您不使用纱线客户端提交作业,而是使用mapReduce apis提交作业。请参阅此链接,例如

但是,如果您需要更多地控制工作,例如获得完成状态,映射器阶段状态,还原阶段状态等,则可以使用

job.submit();

而不是

job.waitForCompletion(true)

您可以使用functions job.mapprogress()和job.reduceprogress()获得状态。您可以探索的工作对象中有很多功能。

就您对

的查询而言
hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount /user/admin/input /user/admin/output/

这里发生的是您正在运行WordCount.jar中可用的驱动程序程序。而不是这样做" Java -jar WordCount.jar "您正在使用" HADOOP JAR WORDCOUNT.JAR "。您还可以使用" 纱Jar WordCount.jar "。与Java -Jar命令相比,Hadoop/Yarn将设置必要的其他类路径。这将执行您的驱动程序程序的" main()",该程序可在命令中指定的类org.apache.hadoop.examples.wordcount中使用。

您可以查看WordCount类的源源

我假设您想通过纱线提交作业的唯一原因是将其与某些服务启动MapReduce2作业的服务集成在一起。

为此,您总是可以让您的驱动程序main()类似的东西。

public class MyMapReduceDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    /******/
    int errCode = ToolRunner.run(conf, new MyMapReduceDriver(), args);
    System.exit(errCode);
}
@Override
public int run(String[] args) throws Exception {
    while(true) {
        try{
            runMapReduceJob();
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
    }
}
private void runMapReduceJob() {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "word count");
    /******/
    job.submit();
    // Get status
    while(job.getJobState()==RUNNING || job.getJobState()==PREP){
        Thread.sleep(1000);
        System.out.println(" Map: "+ StringUtils.formatPercent(job.mapProgress(), 0) + " Reducer: "+ StringUtils.formatPercent(job.reduceProgress(), 0));
    }
}}

希望这会有所帮助。

最新更新