我正在使用Flink v1.4.0
.
我正在利用Flink
的原生图形API(Gelly),我用它来处理1200万个数据点(边)。我目前正在使用Flink
迷你集群通过IntelliJ
运行我的作业,该集群在同一 JVM 中执行所有任务管理器和作业管理器。
当我加载数据,有效地生成我的边时,就在我运行Flink
作业之前,我总是看到以下异常:
...
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#XXXXXXXXXX] with leader session id XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.
322062 [main] ERROR com.somecompany.some.domain.name.some.javaClass- Error executing pipeline
java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
。
我确保通过IntelliJ
编辑运行配置以添加:
-Dakka.client.timeout:600s
-Dakka.ask.timeout:600s
但例外仍然存在,我对导致它的原因没有其他线索。当我减小数据大小时,一切正常。
当我尝试通过Flink UI
提交相同的作业以在群集上安装的本地版本上运行Flink
时,会出现同样的问题。在这种情况下,作业永远不会启动,我什至无法预览自动生成的运算符 DAG。当我减少要处理的数据量时,问题再次消失。我还更新了flink-conf.yaml
以包含相同的akka
配置属性,但这没有区别。
我该如何解决这个问题?
在 IntelliJ 中运行 Flink 作业时,依赖于 Flink 迷你集群。迷你集群不同于在 Standalone、Yarn 或 Mesos 上运行 Flink,因为它依赖于单个 JVM。此外,迷你集群以多种方式进行了预配置,并且并不总是可以更改该配置(至少在某些设置方面)。
在将作业提交到集群时(而不是通过小型集群运行作业时),我必须更改的一件事是分配给作业管理器的堆内存的大小。这是必要的,因为加载要处理的数据不是我想运行的 Flink 作业的一部分(这不是 Flink 的标准做法,做这样的事情实际上是错误的)。通过增加作业管理器的堆,我能够让我的作业运行,但最终我必须为我的 Flink 作业定义一种新的输入格式,以使作业管理器不必预先加载数据来执行 - 无论如何,这不应该是它的责任。
对于手头的问题:向作业管理器分配堆内存无法通过 IntelliJ 完成(据我所知),因此作业总是会失败。