从流作业启动批处理



嗨,我有一个用于 Flink 流处理的 maven 项目。根据我从流中收到的消息,我开始批处理,但目前出现错误。

我对这个 flink 世界很陌生,如果您有任何想法,请告诉我。这是我用来启动独立集群的代码。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ( );
KafkaConsumerService kafkaConsumerService= new KafkaConsumerService();
FlinkKafkaConsumer010<String> kafkaConsumer = kafkaConsumerService.getKafkaConsumer(settings );
DataStream<String> messageStream = env.addSource (kafkaConsumer).setParallelism (3);
messageStream
.filter(new MyFilter()).setParallelism(3).name("Filter")
.map(new ProcessFile(arg)).setParallelism(3).name("start batch")
.addSink(new DiscardingSink()).setParallelism(3).name("DiscardData");
env.execute("Stream processor");

进程文件映射类

public ProcessFile(String arg) { }
@Override
public String map(String message) throws Exception {
MessageType typedmessage = ParseMessage(message);
if (isWhatIwant()) {
String[] batchArgs = createBatchArgs();
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, jobMasterHost);
config.setInteger(JobManagerOptions.PORT, jobMasterPort);
StandaloneClusterClient client = new StandaloneClusterClient(config);
client.setDetached(true);
PackagedProgram program = new PackagedProgram(new File(jarLocation), SupplyBatchJob.class.getName(), batchArgs);
client.run(program, 7);
}
return typedmessage;
}

错误是从作业管理器 Web 门户复制的。我收到错误:org.apache.flink.client.program.ProgramInvocationException:无法检索 JobManager 网关。 at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:497) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442) at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) at cw.supply.data.parser.maps.ProcessFileMessage.map(ProcessFileMessage.java:47) at cw.supply.data.parser.maps.ProcessFileMessage.map(ProcessFileMessage.java:25) at org.apache.flink.streaming.api.operators .StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators .AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators .AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)at org.apache.flink.streaming.api.operators .StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators .AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators .AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) at org.apache.flink.streaming.api.operators .StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:269) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:483) at org.apache.flink.streaming.api.operators .StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748)原因:org.apache.flink.util.FlinkException:无法连接到领先的 JobManager。请检查作业管理器是否正在运行。 at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:789) at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:495) ...30 更多 由以下原因引起:org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException:无法检索领导者网关。 at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79) at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:784) ...还有 31 个 由以下原因引起:java.util.concurrent.TimeoutException:Futures 在 [10000 毫秒] 后超时 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at scala.concurrent.Await.result(package.scala) at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:77) ...还有 32 个

在访问我验证的环境后,我想出了问题所在。我正在使用端口未打开的作业管理器的公共地址。相反,我开始使用私有IP,因为所有节点都在同一个子网中,无需向世界开放端口。希望这也对其他人有所帮助。

相关内容

  • 没有找到相关文章

最新更新