我正在当地的环境(嵌入式弗林克集群(中运行流媒体处理应用程序。我使用代码成功处理了几次特定数据集。我想在对处理逻辑进行了一些修改后昨天重新运行该应用程序,但是在数据处理大约3/4之后,flink群集似乎无缘无故地崩溃了。查看冷凝日志 - 我的评论插入尖锐的括号<>:
2018-02-09 12:04:05,146 [INFO] from a.b.l.f.MultiS3FileSource in Source: General source (1/1) - inserting 266574 events
2018-02-09 12:10:55,094 [ERROR] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11020 - class org.apache.flink.runtime.client.JobSubmissionClientActor received unknown message:
2018-02-09 12:10:55,245 [WARN] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11019 - Discard message LeaderSessionMessage(7240d925-8573-44e8-996c-fa4658ab0463,02/09/2018 12:10:55 Process -> Detection(7/8) switched to CANCELED ) because there is currently no valid leader id known.
2018-02-09 12:10:55,268 [WARN] from o.a.f.r.c.JobSubmissionClientActor in flink-akka.actor.default-dispatcher-11019 - Discard message LeaderSessionMessage(7240d925-8573-44e8-996c-fa4658ab0463,02/09/2018 12:10:55 Enrichment-> Flat Map(7/8) switched to CANCELED ) because there is currently no valid leader id known.
... <similar messages for all the processing steps>
2018-02-09 12:10:55,509 [ERROR] from o.a.f.s.r.t.StreamTask in PartialAggregations-> Sink: CassandraSink (1/8) - Error during disposal of stream operator.
java.lang.InterruptedException: null <because its interrupting a future>
... <for all of my sinks - these are custom, not the flink cassandra connectors>
第一个信息消息是关于我的来源读取来自S3的数据并将其收集到Flink的信息。
之后,第一个错误的产生:https://github.com/apache/flink/flink/master/master/flink-runtime/src/src/main/java/java/org/org/apache/flink/flink/flink/runtime/client/client/jobsbormistionclientclient.java#l137
和警告由以下警告制作:https://github.com/apache/flink/blob/master/flink-runtime/src/src/main/java/java/org/org/apache/flink/flink/runtime/akka/flinkuntypedpedactor.java-java#L115
最后一个错误是在我的代码中,但它是由弗林克(Flink(试图拆除工作而引起的,因此它不应是错误的原始原因。
我可以提供一些额外的信息,但我不确定什么会相关。
第一个错误似乎是整个崩溃的级联。JobsubmissionClientActor如何有可能使getleaderSession呢?如果Flink运行嵌入式,JobsubmissionClientActor期望JobsubmissionClientActor的哪种消息?在我看来,它能够收到的所有消息都是有关提交作业的消息。甚至应该在嵌入式模式下可能吗?我该如何防止此崩溃?
更新:我认为我误解了错误日志。当我再执行执行一次时,我的事件顺序略有不同。在上一个运行中,我只遇到了流的处置,没有明显的原因导致流结束,因为最后一个错误似乎未包含在我的日志文件中(尽管它已打印到stdout中(。此错误在下面,以前的错误与以前的运行相似(围绕流的处置错误(。
。[error] Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: JobClientActor seems to have died before the JobExecutionResult could be retrieved.[error] at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:285)
[error] at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
[error] at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
[error] at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
[error] at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:108)
[error] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
[error] at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
[error] at a.b.l.flink.FlinkIngestPrototype$.run(FlinkIngestPrototype.scala:90)
[error] at a.b.l.flink.FlinkIngestPrototype$.main(FlinkIngestPrototype.scala:43)
[error] at a.b.l.flink.FlinkIngestPrototype.main(FlinkIngestPrototype.scala)
[error] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
[error] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
[error] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
[error] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
[error] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
[error] at scala.concurrent.Await$.result(package.scala:190)
[error] at scala.concurrent.Await.result(package.scala)
[error] at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:273)
[error] ... 9 more
ive将执行失败跟踪到以下内容:
Jobclient对象会使工作客户演员是否完成,如果还没有完成,那么如果他还活着,就可以将他贴上。
此ping段散发出来,并将毒药发送给工作演员,并导致所有不同的处置错误。
我已经遇到了一些未来问题,然后他们会以非确定性的方式被短暂的超时打断。我有点讨论了这个问题,我认为这是因为GC的暂停很长(或类似的东西(。插图如何与GC停顿同步:https://i.stack.imgur.com/izbab.jpg。我认为这可能是暂停的原因。这是我的GC配置:
"-XX:-UseParallelGC",
"-XX:-UseConcMarkSweepGC",
"-XX:+UseG1GC",
根据大多数来源,应导致非常短的GC暂停(不到一秒钟(。有人在Flink中有很长的GC暂停经验吗?这可能是与HW连接的问题吗?我在EC2 AWS实例上运行该应用程序。
正如您所说,这是GC暂停的问题,我试图解决此类问题的事情是:
- 减少工作记忆要求
- 增加系统可用内存
- 增加心跳超时,因此在长时间暂停后不会崩溃