使用 beam 2.6 和 Flink 1.5.3 运行测试。 光束运行本地 Flink 流道没有问题。但无法在 flink 集群上运行。尝试 MVN 和 Flink 提交作业。使用 mvn 运行时,我使用了:
mvn clean package -Pflink-runner exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=FlinkRunner --flinkMaster=c2:6123 --filesToStage=target/word-count-beam-bundled-0.1.jar"
这会导致请求 Blob 服务器端口上的问题阻止,并且无法继续
2018-9-21上午11:47:38 org.apache.zookeeper.ClientCnxn$SendThread logStartConnect INFO: 打开与服务器的套接字连接 192.168.0.12/192.168.0.12:2181. 不会尝试使用 SASL 进行身份验证(未知错误( 2018-9-21上午11:47:38 org.apache.zookeeper.ClientCnxn$SendThread primeConnection INFO: 已建立到 192.168.0.12/192.168.0.12:2181 的套接字连接, 启动会话 2018-9-21上午11:47:38 org.apache.zookeeper.ClientCnxn$SendThread onConnected INFO: Session 在服务器 192.168.0.12/192.168.0.12:2181 上完成建立, 会话 ID = 0x165adfcdcd9104b,协商超时 = 90000 2018-09-21 11:47:38 信息 连接管理器$HConnection实现:2155 - 关闭主协议: 主服务 2018-09-21 11:47:38 信息 连接管理器$HConnection实施:1712 - 关闭动物园管理员 sessionid=0x165adfcdcd9104b 2018年9月21日 上午11:47:38 org.apache.zookeeper.ClientCnxn$EventThread run INFO: EventThread closed down Sep 21, 2018 11:47:38 AM org.apache.zookeeper.ZooKeeper close 信息:会议:0x165adfcdcd9104b关闭 Sep 21, 2018 11:47:39 AM org.apache.flink.client.program.rest.RestClusterClient submitJob INFO: 提交作业 aa366f2bd4bff3ddab47c1a890c84256(已分离:假(。九月 21, 2018 11:47:39 上午 org.apache.flink.client.program.rest.RestClusterClient submitJob INFO:请求 Blob 服务器端口。
使用 Flink 直接提交作业时,请使用:
$FLINK_HOME/bin/flink run -c org.apache.beam.examples.WordCount target/test-beam-bundled-0.1.jar --runner=FlinkRunner --flinkMaster=c2:6123 --filesToStage=target/word-count-beam-bundled-0.1.jar
它抛出以下异常:
程序完成,但出现以下异常:
远程环境在 预定义的上下文(例如命令行客户端、Scala Shell 或 测试环境( org.apache.flink.api.java.RemoteEnvironment.(远程环境.java:126( org.apache.flink.api.java.RemoteEnvironment.(远程环境.java:86( org.apache.flink.api.java.ExecutionEnvironment.createRemoteEnvironment(ExecutionEnvironment.java:1168( org.apache.beam.runners.flink.FlinkExecutionEnvironments.createBatchExecutionEnvironment(FlinkExecutionEnvironments.java:58( org.apache.beam.runners.flink.Flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:93( org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110( org.apache.beam.sdk.Pipeline.run(Pipeline.java:313( org.apache.beam.sdk.Pipeline.run(Pipeline.java:299(
知道吗?
我发现这是光束版本问题。Beam java sdk 只能与 Flink 1.5.1 一起使用。Flink 1.5.2+ 中有一个更改,其中 blob 上传方法发生了变化。Beam 无法通过 Flink blob rest API 加载罐子。