如何缓存从 sql 查询生成的数据帧



我在SO上看到过几个关于缓存sql表的问题,但似乎没有一个能完全回答我的问题。

来自查询的结果数据帧(来自 sqlContext.sql("..."((似乎不像常规数据帧那样可缓存。

下面是一些示例代码(Spark 2.2(:

import org.apache.spark.sql._
def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined
val df = List("1", "2", "3").toDF.cache
df.show
isCached(df) // 1) Here, isCached returns 'true'
df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")
val df2 = spark.sqlContext.sql("select value, count(*) from myTable group by value").cache
df2.show
isCached(df2) // 2) (???) returns 'false'
val df3 = spark.sqlContext.sql("select value, 'a', count(*) from myTable group by value")
df3.registerTempTable("x")
spark.sqlContext.cacheTable("x")
df3.show
spark.catalog.isCached("x") // Returns 'true'
isCached(df3) // 3) (???) Returns 'false'
spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'
spark.catalog.isCached("x") // 4) (???) Returns 'false'

Spark UI显示了一些与df2关联的存储,但它似乎与df相关联。 通常,我们.cache()执行,然后.count()来实现,然后在不再需要时unpersist父数据帧。 在此示例中,当取消持久化df时,在 Spark UI 中看到的 df2df3存储也会消失。

那么我们如何让 (2(、(3( 或最重要的 (4( 返回 true 呢?

一段时间后,我认为发布我问题的答案可能会很有用。

诀窍是使用新数据帧截断关系谱系。

为此,我称spark.createDataFrame(df.rdd, df.schema).cache().其他人建议调用rdd.cache.count但这似乎比在不实现底层 rdd 的情况下创建一个新 rdd 效率低得多。

import org.apache.spark.sql._
def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined
val df = List("1", "2", "3").toDF.cache
df.count // cache the df.
isCached(df) // 1) Here, isCached returns 'true'
df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")
val df2Temp = spark.sqlContext.sql("select value, count(*) from myTable group by value")
// truncate lineage and materialize new dataframe
val df2Cached = spark.createDataFrame(df2Temp.rdd, df2Temp.schema).cache
df2Cached.count
isCached(df2Cached) // 2) returns 'true'
df2Cached.createOrReplaceTempView("x")
// Still cached
isCached(df) 
spark.catalog.isCached("myTable")
// parent df not needed anymore
spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'
spark.catalog.isCached("x") // Still cached

相关内容

  • 没有找到相关文章

最新更新