getPersistentRDD在Spark 2.2.0中返回缓存的RDD和DataFrames的Map,但在Spark



如果Spark版本2.2.0getPersistentRDDs中的缓存RDD和DataFrame返回Map size 2:

scala> val rdd = sc.parallelize(Seq(1))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val df = sc.parallelize(Seq(2)).toDF
df: org.apache.spark.sql.DataFrame = [value: int]
scala> spark.sparkContext.getPersistentRDDs
res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
scala> df.cache
res1: df.type = [value: int]
scala> spark.sparkContext.getPersistentRDDs
res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
Map(4 -> *SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
MapPartitionsRDD[4] at cache at <console>:27)
scala> rdd.cache
res3: rdd.type = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> spark.sparkContext.getPersistentRDDs
res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
Map(0 -> ParallelCollectionRDD[0] at parallelize at <console>:24, 4 -> *SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
MapPartitionsRDD[4] at cache at <console>:27)

但在Spark 2.4.7版本中,getPersistentRDDs返回映射大小为1的

...
scala> spark.sparkContext.getPersistentRDDs
res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(0 -> ParallelCollectionRDD[0] at parallelize at <console>:24)

以及如何获取所有缓存对象而不仅仅是RDD的问题突然间,这个开始表现不同的方法发生了什么?

数据帧实际上并没有缓存在内存中,因为还没有对数据帧执行任何操作,所以实际上可以将其从getPersistentRDDs的结果中排除。我认为后面版本中的行为实际上是可取的。但是,一旦您在数据帧上做了一些事情,它将被缓存,并且它将出现在getPersistentRDDs的结果中,如下所示:

scala> val df = sc.parallelize(Seq(2)).toDF
df: org.apache.spark.sql.DataFrame = [value: int]
scala> sc.getPersistentRDDs
res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
scala> df.cache
res1: df.type = [value: int]
scala> sc.getPersistentRDDs
res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
scala> df.count()
res3: Long = 1
scala> sc.getPersistentRDDs
res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
Map(3 -> *(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan[obj#1]
MapPartitionsRDD[3] at count at <console>:26)

最新更新