当我使用Spark DataFrame执行Action时。缓存数据帧后,执行操作所需的时间和第二次操作所需的时间几乎相同。我的代码如下
logger.info("start to consuming result count")
logger.info(s"consuming ${result.count} output records")
logger.info("starting go to MysqlSink")
logger.info(s"consuming ${result.count} output records")
logger.info("starting go to MysqlSink")
控制台日志如下
18/09/08 14:15:17 INFO MySQLRiskScenarioRunner: start to consuming result count
18/09/08 14:15:49 INFO MySQLRiskScenarioRunner: consuming 5 output records
18/09/08 14:15:49 INFO MySQLRiskScenarioRunner: starting go to MysqlSink
18/09/08 14:16:22 INFO MySQLRiskScenarioRunner: consuming 5 output records
18/09/08 14:16:22 INFO MySQLRiskScenarioRunner: starting go to MysqlSink
所以,第一次是 32 秒,第二次是 33 秒。必须使用 DAG 重新计算数据帧,缓存无效或未处于活动状态。谁能解决我的疑问?
非常感谢。
由于您没有在日志行之前给出代码,因此我只是假设 没有使用对缓存的引用。它应该是这样的,
val dataToCache = ???
val cachedData = dataToCache.cache
使用cachedData
引用,
logger.info("start to consuming result count")
logger.info(s"consuming ${cachedData.count} output records")
logger.info("starting go to MysqlSink")
logger.info(s"consuming ${cachedData.count} output records")
logger.info("starting go to MysqlSink")
希望这有帮助。