我正在做这样的事情:
import pandas as pd
pdf = pd.DataFrame({
'a': [1, 2, 3],
'b': ['a', 'b', 'c']
})
parent_df = spark.createDataFrame(pdf)
parent_df.cache().count()
child_df = parent_df.replace('c', 'x')
child_df.cache().count()
parent_df.unpersist()
本质上,我想缓存parent_df
,因为在接下来的步骤中,我将对其进行一些繁重的转换。一旦我完成了这些并且我回到child_df
,我不再需要parent_df
,因此想从缓存中释放它。但是,这样做也会取消新缓存的child_df
!
所以很明显,问题是:
- 为什么会这样?
- 如何完成我想要的(从缓存中释放
parent_df
,同时将新child_df
保留在缓存中(?
有趣的是,相反的情况有效 - 即如果我取消持久化child_df
而不是parent_df
最后一行,则parent_df
将按预期保持缓存,而child_df
将被释放。
PS:我在这里发现了一个类似的问题 了解Spark的缓存 .但是,在这种情况下,该答案似乎不起作用,因为在这里我们已经在缓存后立即调用了操作(.count()
(。
这是一个基于数据一致性的有意识的设计决策。取消持久化父级的一个可能原因是您希望其源数据发生更改。让父母使用新数据,而明显的孩子使用旧数据,可能会导致意外和不一致的结果。因此,当父级无效时,父级的任何缓存子项都将失效。
在实施此更改的 PR 中以及在引入更改后的此错误报告中进行了一些讨论。
如第二个链接中所述,如果您确实需要持久化子项,则可以通过使用 saveAsTable
将其具体化为表来实现。
,我想我找到了一个解决方案:
首先,我对为什么会发生这种情况的猜测是,
parent_df
缓存点是child_df
的血统的一部分。 即,即使child_df
使用较新的缓存点,其DAG仍包含parent_df
中的较早位。因此,删除该缓存点会以某种方式影响后面的缓存点。至于如何防止这种情况发生,请做好以下工作:
import pandas as pd
pdf = pd.DataFrame({
'a': [1, 2, 3],
'b': ['a', 'b', 'c']
})
parent_df = spark.createDataFrame(pdf)
parent_df.cache().count()
# this is the relevant line
child_df = spark.createDataFrame(parent_df.rdd, schema=parent_df.schema)
child_df = child_df.replace('c', 'x')
child_df.cache().count()
parent_df.unpersist()
在相关行(标有注释(发生的情况是,child_df
的世系被削减,不包括与parent_df
对应的部分,并以"新的RDD"开头。取消持久化parent_df
,child_df
的世系不受影响。
再说一次 - 即使这似乎有效,我也欢迎对这个理论的更多解释/确认作为公认的答案!