给定以下一系列事件:
df1 = read
df2 = df1.action
df3 = df1.action
df2a = df2.action
df2b = df2.action
df3a = df3.action
df3b = df3.action
df4 = union(df2a, df2b, df3a, d3b)
df4.collect()
数据分叉两次,因此df1将被读取4次。因此,我希望持久化数据。根据我的理解,应该这样做:
df1 = read
df1.persist()
df2 = df1.action
df3 = df1.action
df2.persist()
df3.persist()
df2a = df2.action
df2b = df2.action
df3a = df3.action
df3b = df3.action
df4 = union(df2a, df2b, df3a, d3b)
df4.collect()
df1.unpersist()
df2.unpersist()
df3.unpersist()
然而,这将所有三个同时保存在内存中,考虑到在创建df2和df3之后我不再需要保存df1,这不是存储效率。我想这样订购:
df1 = read
df1.persist()
df2 = df1.action
df3 = df1.action
df1.unpersist()
df2.persist()
df3.persist()
df2a = df2.action
df2b = df2.action
df2.unpersist()
df3a = df3.action
df3b = df3.action
df3.unpersist()
df4 = union(df2a, df2b, df3a, d3b)
df4.collect()
然而,这只会导致数据根本没有被持久化,因为我需要在非持久化之前触发一个操作。是否有任何方法可以实现我正在寻找的(在执行计划中间不持久的中间数据框架)?
这是不可能的,但可以稍微重新排列。
转换在没有执行的情况下构建dag,实际的持久化是在操作触发执行时发生的。如果缓存的父RDD没有被持久化,那么所有子RDD也都没有被持久化。这是一种设计选择,更多地关注数据的正确性和一致性。这就是数据根本没有被持久化的原因。
稍微改善你的步数,
df1 = read
df1.persist()
df2 = df1.action # after this df1 will be persisted
df3 = df1.action # this will be faster as df1 is cached
df2.persist()
df3.persist()
# perform 1 action on df2 and df3 each to trigger their caching
df2a = df2.action
df3a = df3.action
df2b = df2.action # this will be faster as df2 is cached
df3b = df3.action # this will be faster as df3 is cached
df4 = union(df2a, df2b, df3a, d3b)
df4.collect()
df1.unpersist() # this along with dependents will get un persisted
相关参考资料:
- https://github.com/apache/spark/pull/17097
- https://issues.apache.org/jira/browse/spark - 21579