Spark Scala检查点数据集在Action之后显示.isCheckpointed=false,但写入了目录



似乎有一些关于这方面的帖子,但似乎没有一条能回答我的理解。

以下代码在DataBricks上运行:

spark.sparkContext.setCheckpointDir("/dbfs/FileStore/checkpoint/cp1/loc7")
val checkpointDir = spark.sparkContext.getCheckpointDir.get
val ds = spark.range(10).repartition(2)
ds.cache()
ds.checkpoint()
ds.count()
ds.rdd.isCheckpointed  

添加了某种改进:

...
val ds2 = ds.checkpoint(eager=true)
println(ds2.queryExecution.toRdd.toDebugString)
...

退货:

(2) MapPartitionsRDD[307] at toRdd at command-1835760423149753:13 []
|  MapPartitionsRDD[305] at checkpoint at command-1835760423149753:12 []
|  ReliableCheckpointRDD[306] at checkpoint at command-1835760423149753:12 []
checkpointDir: String = dbfs:/dbfs/FileStore/checkpoint/cp1/loc10/86cc77b5-27c3-4049-9136-503ddcab0fa9
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
res53: Boolean = false

问题1:

ds.rdd.isCheckpointedds2.rdd.isCheckpointed者都返回False,即使有count,我也有非懒惰的情况。为什么,尤其是/loc 7&10是用(部分)文件编写的?我们还可以看到ReliableCheckPoint!

没有很好地解释整个概念。试图解决这个问题。

问题2-次要问题:

最新版本的Spark 2.4是否真的需要缓存?ds上的一个新分支,如果不缓存,会导致重新计算吗?还是现在更好了?似乎很奇怪,检查点数据不会被使用,或者我们可以说Spark真的不知道什么更好?

从High Performance Spark中,我得到了一个混合的印象,即检查点不太推荐,但后来又是这样。

TL;DR:您不检查实际检查点的对象:

ds2.queryExecution.toRdd.dependencies(0).rdd.isCheckpointed
// Boolean = true

ds.rdd.isCheckpointed或ds2.rdd.isCheckpointed均返回False

这是预期的行为。检查点对象不是您引用的转换后的RDD(这是转换为外部表示所需的额外转换的结果),而是内部RDD对象(事实上,正如您在上面看到的,它甚至不是最新的内部RDD,而是它的父对象)。

此外,在第一种情况下,您只是使用了一个错误的Dataset对象——正如链接答案中所解释的,Dataset.checkpoint返回一个新的Dataset

即使有计数,我也有一个非懒惰的情况

这没有多大意义。默认的checkpoint实现是eager,因此它强制求值。即使不是这样,Dataset.count也不是强制评估的正确方式。

最新版本的是否真的需要缓存

正如您在链接源中看到的,Dataset.checkpoint在内部使用RDD.checkpoint,因此适用相同的规则。然而,您已经执行了一个单独的操作来强制检查点,因此额外的缓存,特别是考虑到Dataset持久性的成本,可能会有些过头了。

当然,如果有疑问,您可以考虑在特定的环境中进行基准测试。

相关内容

  • 没有找到相关文章