Apache Flink批处理模式在几分钟后失败,并打印结果



我正在使用Apache Flink读取CSV文件,然后将记录转换为一个表,从中执行SQL查询并将结果打印到stdout。

代码(简化(:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
rowDataStreamSource = env.readFile(...).disableChaining();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
final Table table = tableEnv.fromChangelogStream(rowDataStreamSource, ordersSchema, ChangelogMode.insertOnly());
tableEnv.createTemporaryView("orders", table);
Table originalSQL = tableEnv.sqlQuery(...)
originalSQL.execute().print();
env.execute();

运行此作业后,几分钟后我收到一个错误:

21:29:49.999[deploy.instance_IS_UNDEFINED,,,][mini-cluster-io-read-3]信息o.a.f.r.rpc.akka.akaRpcService-正在停止akka rpc服务。21:29:50.066[deploy.instance_IS_UNDEFINED,,][http-nio-8080-exec-1]WARN o.a.f.s.a.o.c.CollectResultFetcher-无法获取作业状态,因此我们认为工作已经终止。某些数据可能会丢失。java.lang.IollegalStateException:MiniCluster尚未运行或已经关闭。在org.apache.flink.util.Preconditions.checkState(Preconditions.java:193(在org.apache.flink.runtime.minicluster.minicluster.getDispatcherGatewayFuture(minicluster.java:877(

然后,在异常的正下方打印带有sql结果的表。

是什么导致了这里的问题?有没有办法告诉Flink没有更多的记录,这样它就可以完成任务并打印结果?

您应该使用FileSource而不是readFile,以便在批处理执行模式下正确工作:https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/file/src/FileSource.html

或者,更好的是,您可以直接使用SQL来定义一个表,该表充当接收输入文件的源,如下所述:https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/filesystem/

最新更新