我知道我可以通过将级联作业打包到 JAR 中来提交级联作业,如级联用户指南中所述。 如果我使用 CLI 命令手动提交该作业,则该作业将在我的集群上运行hadoop jar
该作业。
1级联版本中,可以通过在Hadoop JobConf
上设置某些属性来向集群提交作业。 设置 fs.defaultFS
和 mapred.job.tracker
会导致本地 Hadoop 库自动尝试将作业提交到 Hadoop1 JobTracker。 但是,设置这些属性似乎在较新版本中不起作用。 使用级联版本 2.5.3(将 CDH5 列为支持的平台)提交到 CDH5 5.2.1 Hadoop 集群会导致与服务器协商时出现 IPC 异常,如下所述。
我相信这个平台组合 - 级联2.5.6,Hadoop 2,CDH 5,YARN和MR1 API 提交 - 是基于兼容性表的支持组合(请参阅"先前版本"标题)。 使用 hadoop jar
提交作业在同一集群上工作正常。 端口 8031 在提交主机和资源管理器之间打开。 在服务器端的资源管理器日志中发现具有相同消息的错误。
我正在使用cascading-hadoop2-mr1
库。
Exception in thread "main" cascading.flow.FlowException: unhandled exception
at cascading.flow.BaseFlow.complete(BaseFlow.java:894)
at WordCount.main(WordCount.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RpcServerException): Unknown rpc kind in rpc headerRPC_WRITABLE
at org.apache.hadoop.ipc.Client.call(Client.java:1411)
at org.apache.hadoop.ipc.Client.call(Client.java:1364)
at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:231)
at org.apache.hadoop.mapred.$Proxy11.getStagingAreaDir(Unknown Source)
at org.apache.hadoop.mapred.JobClient.getStagingAreaDir(JobClient.java:1368)
at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:102)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:982)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:976)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:950)
at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:105)
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:196)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:149)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
下面是演示代码,它与级联用户指南中的 WordCount 示例基本相同。
public class WordCount {
public static void main(String[] args) {
String inputPath = "/user/vagrant/wordcount/input";
String outputPath = "/user/vagrant/wordcount/output";
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath );
Scheme sinkScheme = new TextDelimited( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );
Pipe assembly = new Pipe( "wordcount" );
String regex = "(?<!\pL)(?=\pL)[^ ]*(?<=\pL)(?!\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );
assembly = new GroupBy( assembly, new Fields( "word" ) );
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );
Properties properties = AppProps.appProps()
.setName( "word-count-application" )
.setJarClass( WordCount.class )
.buildProperties();
properties.put("fs.defaultFS", "hdfs://192.168.30.101");
properties.put("mapred.job.tracker", "192.168.30.101:8032");
FlowConnector flowConnector = new HadoopFlowConnector( properties );
Flow flow = flowConnector.connect( "word-count", source, sink, assembly );
flow.complete();
}
}
我还尝试设置一堆其他属性来尝试使其工作:
-
mapreduce.jobtracker.address
-
mapreduce.framework.name
-
yarn.resourcemanager.address
-
yarn.resourcemanager.host
-
yarn.resourcemanager.hostname
-
yarn.resourcemanager.resourcetracker.address
这些都不起作用,它们只是导致作业在本地模式下运行(除非还设置了mapred.job.tracker
)。
我现在已经解决了这个问题。 它来自尝试使用Cloudera分发的较旧的Hadoop类,特别是JobClient。 如果将 hadoop-core
与提供的2.5.0-mr1-cdh5.2.1
版本一起使用,或者将hadoop-client
依赖项与此版本号一起使用,则会发生这种情况。 虽然这声称是MR1版本,并且我们使用MR1 API提交,但这个版本实际上只支持提交到Hadoop1 JobTracker,并且不支持YARN。
为了允许提交到 YARN,您必须将hadoop-client
依赖项与非 MR1 2.5.0-cdh5.2.1
版本一起使用,该版本仍支持将 MR1 作业提交到 YARN。