我是Spark的新手,我很惊讶有些结果没有重新计算,尽管我没有(至少我不想)缓存它们,即我必须重新启动sbt才能看到更新的值。
以下是相关的代码片段:
val df: DataFrame = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql://dbHost:5432/tests?user=simon&password=password",
"dbtable" -> "events")
).load()
val cached = df.cache()
val tariffs = cached.map(row => row.getAs[Int](2))
如果我打印tariffs.toDF().mean()
我会得到正确的平均值,但如果我将代码更改为:
val tariffs = cached.map(row => 0)
在重新启动 sbt 之前,我看不到新的平均(0)
。如何避免这种行为?
我看不到您的整个代码,因此我无法确定回答,但是,如果以下代码产生相同的输出,您应该在 https://issues.apache.org/jira/browse/spark 提交错误报告
println(cached.map(row => row.getInt(2)).toDF().mean().collect(0))
println(cached.map(row => 0).toDF().mean().collect(0))
但是,如果它们产生不同的输出,那么您的 REPL 会话很可能有问题。
更一般地说,要消除缓存的影响,请使用
cached.unpersist()