我在SO上看到过几个关于缓存sql表的问题,但似乎没有一个能完全回答我的问题。
来自查询的结果数据帧(来自 sqlContext.sql("..."((似乎不像常规数据帧那样可缓存。
下面是一些示例代码(Spark 2.2(:
import org.apache.spark.sql._
def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined
val df = List("1", "2", "3").toDF.cache
df.show
isCached(df) // 1) Here, isCached returns 'true'
df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")
val df2 = spark.sqlContext.sql("select value, count(*) from myTable group by value").cache
df2.show
isCached(df2) // 2) (???) returns 'false'
val df3 = spark.sqlContext.sql("select value, 'a', count(*) from myTable group by value")
df3.registerTempTable("x")
spark.sqlContext.cacheTable("x")
df3.show
spark.catalog.isCached("x") // Returns 'true'
isCached(df3) // 3) (???) Returns 'false'
spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'
spark.catalog.isCached("x") // 4) (???) Returns 'false'
Spark UI显示了一些与df2关联的存储,但它似乎与df相关联。 通常,我们.cache()
执行,然后.count()
来实现,然后在不再需要时unpersist
父数据帧。 在此示例中,当取消持久化df
时,在 Spark UI 中看到的 df2
和df3
存储也会消失。
那么我们如何让 (2(、(3( 或最重要的 (4( 返回 true 呢?
一段时间后,我认为发布我问题的答案可能会很有用。
诀窍是使用新数据帧截断关系谱系。
为此,我称spark.createDataFrame(df.rdd, df.schema).cache()
.其他人建议调用rdd.cache.count
但这似乎比在不实现底层 rdd 的情况下创建一个新 rdd 效率低得多。
import org.apache.spark.sql._
def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined
val df = List("1", "2", "3").toDF.cache
df.count // cache the df.
isCached(df) // 1) Here, isCached returns 'true'
df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")
val df2Temp = spark.sqlContext.sql("select value, count(*) from myTable group by value")
// truncate lineage and materialize new dataframe
val df2Cached = spark.createDataFrame(df2Temp.rdd, df2Temp.schema).cache
df2Cached.count
isCached(df2Cached) // 2) returns 'true'
df2Cached.createOrReplaceTempView("x")
// Still cached
isCached(df)
spark.catalog.isCached("myTable")
// parent df not needed anymore
spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'
spark.catalog.isCached("x") // Still cached