如何理解 Spark 中使用了缓存?



在我的Scala/Spark应用程序中,我创建了DataFrame。我计划在整个程序中多次使用此数据帧。这就是为什么我决定为该数据帧使用.cache()方法。正如您在循环中看到的那样,我使用不同的值多次过滤数据帧。出于某种原因.count()该方法向我返回始终相同的结果。实际上,它必须返回两个不同的计数值。另外,我注意到梅索斯的奇怪行为。感觉.cache()方法没有被执行。创建数据帧后,程序将转到代码if (!df.head(1).isEmpty)的这一部分并执行很长时间。我假设缓存过程将运行很长时间,而其他进程将使用此缓存并快速运行。你认为问题出在哪里?

import org.apache.spark.sql.DataFrame
var df: DataFrame = spark
.read
.option("delimiter", "|")
.csv("/path_to_the_files/")
.filter(col("col5").isin("XXX", "YYY", "ZZZ"))
df.cache()
var array1 = Array("111", "222")
var array2 = Array("333")
var storage = Array(array1, array2)
if (!df.head(1).isEmpty) {
for (item <- storage) {
df.filter(
col("col1").isin(item:_*)
)
println("count: " + df.count())
}
}

事实上,它必须返回两个不同的计数值。

为什么?您在同一df上调用它.也许你的意思是像

val df1 = df.filter(...)
println("count: " + df1.count())

我假设缓存过程将运行很长时间,而其他进程将使用此缓存并快速运行。

它确实如此,但仅当执行依赖于此数据帧的第一个操作时,该操作head。所以你应该期待确切的

程序转到代码if (!df.head(1).isEmpty)的这一部分并执行很长时间

如果没有缓存,你也会得到两个df.count()调用的相同时间,除非 Spark 检测到它并自行启用缓存。

最新更新