遗嘱执行人.CoarseGrainedExecutiorBackend(Logging.scala:logInfo(57



我从执行器(从底部开始(获取日志:

2021-11-30 21:44:42 
2021-11-30 18:44:42,911 INFO  [shutdown-hook-0] util.ShutdownHookManager (Logging.scala:logInfo(57)) - Deleting directory /var/data/spark-0646270c-a2d0-47d4-8e6c-0bc735bc255d/spark-a54cf7e4-baaf-4411-9073-0c1fb1e4cc5b
2021-11-30 21:44:42 
2021-11-30 18:44:42,910 INFO  [shutdown-hook-0] util.ShutdownHookManager (Logging.scala:logInfo(57)) - Shutdown hook called
2021-11-30 21:44:42 
2021-11-30 18:44:42,902 ERROR [SIGTERM handler] executor.CoarseGrainedExecutorBackend (SignalUtils.scala:$anonfun$registerLogger$2(43)) - RECEIVED SIGNAL TERM
2021-11-30 21:44:42 
2021-11-30 18:44:42,823 INFO  [CoarseGrainedExecutorBackend-stop-executor] storage.BlockManager (Logging.scala:logInfo(57)) - BlockManager stopped
2021-11-30 21:44:42 
2021-11-30 18:44:42,822 INFO  [CoarseGrainedExecutorBackend-stop-executor] memory.MemoryStore (Logging.scala:logInfo(57)) - MemoryStore cleared
2021-11-30 21:44:42 
2021-11-30 18:44:42,798 INFO  [dispatcher-Executor] executor.CoarseGrainedExecutorBackend (Logging.scala:logInfo(57)) - Driver commanded a shutdown

我如何让任何一种登录Spark驱动程序的人都能理解,驱动程序上是什么样的事件触发了执行器关闭?驱动程序或执行程序并不缺乏内存,pod指标显示,它们所占用的内存远远超过了限制+开销。所以,看起来关闭信号的原因并不是资源不足,而是一些隐藏的异常,并没有记录在任何地方。

根据@mazaneicha的建议,我试图设置更长的超时时间,但仍然收到相同的错误

implicit val spark: SparkSession = SparkSession
.builder
.master("local[1]")
.config(new SparkConf().setIfMissing("spark.master", "local[1]")
.set("spark.eventLog.dir", "file:///tmp/spark-events")
.set("spark.dynamicAllocation.executorIdleTimeout", "100s")  //spark.dynamicAllocation.executorIdleTimeout
.set("spark.dynamicAllocation.schedulerBacklogTimeout", "100s")    //spark.dynamicAllocation.schedulerBacklogTimeout
)
.getOrCreate()

失败的原因实际上已发布到日志中:

2021-12-01 15:05:46,906 WARN  [main] streaming.StreamingQueryManager (Logging.scala:logWarning(69)) - Stopping existing streaming query [id=b13a69d7-5a2f-461e-91a7-a9138c4aa716, runId=9cb31852-d276-42d8-ade6-9839fa97f85c], as a new run is being started.

查询是如何停止的?这是因为在Scala中,我在循环中基于集合创建流式查询,同时保持所有查询名称和所有检查点名称相同。在使它们唯一(我只是使用了集合中的字符串值(之后,失败问题就消失了。

最新更新