我的理解是,如果我有一个dataframe,如果我cache()它并触发诸如 df.take(1)
或df.count()之类的操作,它应该计算数据框并将其保存在内存中,并且每当该程序中调用该缓存的数据框时,它都会使用缓存中已经计算的数据框。
但这不是我的程序的工作方式。
我在下面有一个数据框架,然后立即运行df.count
操作。
-
val df = inputDataFrame.select().where().withColumn("newcol" , "").cache()
-
df.count
运行程序时。在Spark UI中,我看到第一行运行4分钟,并且当涉及第二行时,它再次运行4分钟,基本上是重新计算的两次?
第二行触发时不应该计算和缓存吗?
如何解决这种行为。我被卡住了,请建议。
我的理解是,如果我有一个数据框,如果我缓存()并触发诸如df.take(1)或df.count()之类的操作,则应计算数据框并将其保存在内存中,
这是不正确的。简单的cache
和count
(take
也无法在RDD上使用)是RDD的有效方法,但Datasets
并非如此,它使用了更高级的优化。与查询:
df.select(...).where(...).withColumn("newcol" , "").count()
where
子句中未使用的任何列都可以忽略。
在开发人员列表上有一个重要的讨论,并引用了Sean Owen
我认为正确的答案是"不要这样",但是如果您真的必须触发每个分区无助的数据集操作。我认为这会更可靠,因为必须计算整个分区才能实践中可用。或者,甚至循环遍历每个元素。
转换为代码:
df.foreach(_ => ())
有
df.registerAsTempTable("df")
sqlContext.sql("CACHE TABLE df")
渴望的,但不再(火花2和向前)记录,应避免。
no,如果您在数据框架上调用cache
,则在这一刻没有缓存,它只是为潜在的未来缓存而"标记"。实际的缓存只有在以后进行操作时才能完成。您还可以在"存储"下的Spark UI中看到您的缓存数据帧
代码中的另一个问题是,数据框架上的count
不会计算整个数据框,因为并非需要为此计算所有列。您可以使用df.rdd.count()
强制迫使整个验证(请参阅如何强制Spark中的数据帧评估)。
问题是为什么您的第一个操作需要这么长时间,即使没有调用任何操作。我认为这与调用缓存时要计算的缓存逻辑(例如尺寸估计等)有关(例如,请参见RDD.map(Identity)。