我有一个Apache Beam Pipeline,我正试图将其部署在本地部署的Flink Docker集群上。
导致管道故障
The RemoteEnvironment cannot be instantiated when running in a pre-defined context (such as Command Line Client, Scala Shell, or TestEnvironment)
org.apache.flink.api.java.RemoteEnvironmentConfigUtils.validate(RemoteEnvironmentConfigUtils.java:52)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.validateAndGetEffectiveConfiguration(RemoteStreamEnvironment.java:178)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:158)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:144)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:113)
org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.<init>(FlinkExecutionEnvironments.java:319)
org.apache.beam.runners.flink.FlinkExecutionEnvironments.createStreamExecutionEnvironment(FlinkExecutionEnvironments.java:177)
org.apache.beam.runners.flink.FlinkExecutionEnvironments.createStreamExecutionEnvironment(FlinkExecutionEnvironments.java:139)
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:98)
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
ApacheBeamPocJava.main(ApacheBeamPocJava.java:262)
这就是我设置管道的方式
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); options.setRunner(FlinkRunner.class);
options.setFlinkMaster(“localhost:6123”);
options.setFilesToStage(Arrays.asList("path to the beam jar"));
FlinkRunner flinkRunner = FlinkRunner.fromOptions(options);
Pipeline p= Pipeline.create(options);
并且在定义了管道的步骤之后。我像这样运行
flinkRunner.run(p);
这就是我提交作业的方式
flink run -c ClassName PATH_TO_JAR
有人能告诉我这里出了什么问题吗?
此外,如果某人具有Beam<->;Flink示例方便Java使用。我当然也很感激。
您似乎已经在管道本身内部定义了运行环境。你有没有试过像Flink runner文档中描述的那样启动你的管道?(删除代码中定义或配置runner的部分。(
由于Beam是一个将代码与执行它的运行程序解耦的框架,因此没有必要在管道代码本身中使用Flink运行程序配置。如果您可以使用direct runner在本地执行管道,那么在使用正确的概要文件编译时,它也可以在Flink runner(或任何其他受支持的运行程序(上运行。
bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar –runner=FlinkRunner –other-parameters-for-your-pipeline-or-the-runner
请注意,Flink runner的Beam 2.25.0中目前存在一个错误,请在2.24.0版本或更高版本发布时进行尝试。