似乎有一些关于这方面的帖子,但似乎没有一条能回答我的理解。
以下代码在DataBricks上运行:
spark.sparkContext.setCheckpointDir("/dbfs/FileStore/checkpoint/cp1/loc7")
val checkpointDir = spark.sparkContext.getCheckpointDir.get
val ds = spark.range(10).repartition(2)
ds.cache()
ds.checkpoint()
ds.count()
ds.rdd.isCheckpointed
添加了某种改进:
...
val ds2 = ds.checkpoint(eager=true)
println(ds2.queryExecution.toRdd.toDebugString)
...
退货:
(2) MapPartitionsRDD[307] at toRdd at command-1835760423149753:13 []
| MapPartitionsRDD[305] at checkpoint at command-1835760423149753:12 []
| ReliableCheckpointRDD[306] at checkpoint at command-1835760423149753:12 []
checkpointDir: String = dbfs:/dbfs/FileStore/checkpoint/cp1/loc10/86cc77b5-27c3-4049-9136-503ddcab0fa9
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
res53: Boolean = false
问题1:
ds.rdd.isCheckpointed或ds2.rdd.isCheckpointed者都返回False,即使有count,我也有非懒惰的情况。为什么,尤其是/loc 7&10是用(部分)文件编写的?我们还可以看到ReliableCheckPoint!
没有很好地解释整个概念。试图解决这个问题。
问题2-次要问题:
最新版本的Spark 2.4是否真的需要缓存?ds上的一个新分支,如果不缓存,会导致重新计算吗?还是现在更好了?似乎很奇怪,检查点数据不会被使用,或者我们可以说Spark真的不知道什么更好?
从High Performance Spark中,我得到了一个混合的印象,即检查点不太推荐,但后来又是这样。
TL;DR:您不检查实际检查点的对象:
ds2.queryExecution.toRdd.dependencies(0).rdd.isCheckpointed
// Boolean = true
ds.rdd.isCheckpointed或ds2.rdd.isCheckpointed均返回False
这是预期的行为。检查点对象不是您引用的转换后的RDD(这是转换为外部表示所需的额外转换的结果),而是内部RDD对象(事实上,正如您在上面看到的,它甚至不是最新的内部RDD,而是它的父对象)。
此外,在第一种情况下,您只是使用了一个错误的Dataset
对象——正如链接答案中所解释的,Dataset.checkpoint
返回一个新的Dataset
即使有计数,我也有一个非懒惰的情况
这没有多大意义。默认的checkpoint
实现是eager
,因此它强制求值。即使不是这样,Dataset.count
也不是强制评估的正确方式。
最新版本的是否真的需要缓存
正如您在链接源中看到的,Dataset.checkpoint
在内部使用RDD.checkpoint
,因此适用相同的规则。然而,您已经执行了一个单独的操作来强制检查点,因此额外的缓存,特别是考虑到Dataset
持久性的成本,可能会有些过头了。
当然,如果有疑问,您可以考虑在特定的环境中进行基准测试。