如何从 Java 将级联作业提交到远程 YARN 集群



我知道我可以通过将级联作业打包到 JAR 中来提交级联作业,如级联用户指南中所述。 如果我使用 CLI 命令手动提交该作业,则该作业将在我的集群上运行hadoop jar该作业。

但是,在原始的Hadoop

1级联版本中,可以通过在Hadoop JobConf上设置某些属性来向集群提交作业。 设置 fs.defaultFSmapred.job.tracker 会导致本地 Hadoop 库自动尝试将作业提交到 Hadoop1 JobTracker。 但是,设置这些属性似乎在较新版本中不起作用。 使用级联版本 2.5.3(将 CDH5 列为支持的平台)提交到 CDH5 5.2.1 Hadoop 集群会导致与服务器协商时出现 IPC 异常,如下所述。

我相信这个平台组合 - 级联2.5.6,Hadoop 2,CDH 5,YARN和MR1 API 提交 - 是基于兼容性表的支持组合(请参阅"先前版本"标题)。 使用 hadoop jar 提交作业在同一集群上工作正常。 端口 8031 在提交主机和资源管理器之间打开。 在服务器端的资源管理器日志中发现具有相同消息的错误。

我正在使用cascading-hadoop2-mr1库。

Exception in thread "main" cascading.flow.FlowException: unhandled exception
    at cascading.flow.BaseFlow.complete(BaseFlow.java:894)
    at WordCount.main(WordCount.java:91)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RpcServerException): Unknown rpc kind in rpc headerRPC_WRITABLE
    at org.apache.hadoop.ipc.Client.call(Client.java:1411)
    at org.apache.hadoop.ipc.Client.call(Client.java:1364)
    at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:231)
    at org.apache.hadoop.mapred.$Proxy11.getStagingAreaDir(Unknown Source)
    at org.apache.hadoop.mapred.JobClient.getStagingAreaDir(JobClient.java:1368)
    at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:102)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:982)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:976)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:950)
    at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:105)
    at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:196)
    at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:149)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

下面是演示代码,它与级联用户指南中的 WordCount 示例基本相同。

public class WordCount {
    public static void main(String[] args) {
        String inputPath = "/user/vagrant/wordcount/input";
        String outputPath = "/user/vagrant/wordcount/output";
        Scheme sourceScheme = new TextLine( new Fields( "line" ) );
        Tap source = new Hfs( sourceScheme, inputPath );
        Scheme sinkScheme = new TextDelimited( new Fields( "word", "count" ) );
        Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );
        Pipe assembly = new Pipe( "wordcount" );

        String regex = "(?<!\pL)(?=\pL)[^ ]*(?<=\pL)(?!\pL)";
        Function function = new RegexGenerator( new Fields( "word" ), regex );
        assembly = new Each( assembly, new Fields( "line" ), function );

        assembly = new GroupBy( assembly, new Fields( "word" ) );
        Aggregator count = new Count( new Fields( "count" ) );
        assembly = new Every( assembly, count );
        Properties properties = AppProps.appProps()
            .setName( "word-count-application" )
            .setJarClass( WordCount.class )
            .buildProperties();
        properties.put("fs.defaultFS", "hdfs://192.168.30.101");
        properties.put("mapred.job.tracker", "192.168.30.101:8032");
        FlowConnector flowConnector = new HadoopFlowConnector( properties );
        Flow flow = flowConnector.connect( "word-count", source, sink, assembly );
        flow.complete();
    }
}

我还尝试设置一堆其他属性来尝试使其工作:

  • mapreduce.jobtracker.address
  • mapreduce.framework.name
  • yarn.resourcemanager.address
  • yarn.resourcemanager.host
  • yarn.resourcemanager.hostname
  • yarn.resourcemanager.resourcetracker.address

这些都不起作用,它们只是导致作业在本地模式下运行(除非还设置了mapred.job.tracker)。

我现在已经解决了这个问题。 它来自尝试使用Cloudera分发的较旧的Hadoop类,特别是JobClient。 如果将 hadoop-core 与提供的2.5.0-mr1-cdh5.2.1版本一起使用,或者将hadoop-client依赖项与此版本号一起使用,则会发生这种情况。 虽然这声称是MR1版本,并且我们使用MR1 API提交,但这个版本实际上只支持提交到Hadoop1 JobTracker,并且不支持YARN。

为了允许提交到 YARN,您必须将hadoop-client依赖项与非 MR1 2.5.0-cdh5.2.1 版本一起使用,该版本仍支持将 MR1 作业提交到 YARN。

相关内容

  • 没有找到相关文章