我遇到了Spark,它的驱动程序和OoM问题。
目前我有一个数据框架,它正在与几个,连接源(实际上不同的表在拼花格式),并有成千上万的元组构建。它们有一个日期,表示记录的创建日期,显然它们是少数。
我做了以下事情:
from pyspark.sql.functions import year, month
# ...
selectionRows = inputDataframe.select(year('registration_date').alias('year'), month('registration_date').alias('month')).distinct()
selectionRows.show() # correctly shows 8 tuples
selectionRows = selectionRows.collect() # goes heap space OoM
print(selectionRows)
读取内存消耗统计数据显示驱动程序不超过~60%。我认为驱动程序应该只加载不同的子集,而不是整个数据帧。
我错过了什么吗?是否有可能以更智能的方式收集这几行数据?我需要它们作为下推谓词来加载辅助数据框。
非常感谢!
EDIT/SOLUTION
在阅读了评论并详细说明了我的个人需求之后,我缓存了每个"join/elaboration"的数据框架。步骤,以便在时间轴中执行以下操作:
- 连接已加载的表
- 队列需要的转换
- 应用缓存转换
- 打印计数以跟踪基数(主要用于跟踪/调试目的),从而应用所有转换+缓存
- 取消前一步的缓存,如果有的话(tick/tock范例)
这将一些复杂的ETL作业减少到原始时间的20%(因为以前它在每个计数时应用前面每个步骤的转换)。
经验教训:)
在阅读了评论之后,我为我的用例详细阐述了解决方案。
正如问题中提到的,我将几个表连接在一个"目标数据框架"中,并且在每次迭代中执行一些转换,如下所示:
# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)
print(f'Target after table "other": {target.count()}')
慢/OoM的问题是,由于结束的count
, Spark被迫在每个表上从头到尾完成所有转换,使每个表/迭代变得越来越慢。
我发现的解决方案是在每次迭代时缓存数据帧,如下所示:
cache: DataFrame = null
# ...
# n-th table work
target = target.join(other, how='left')
target = target.filter(...)
target = target.withColumn('a', 'b')
target = target.select(...)
target = target.cache()
target_count = target.count() # actually do the cache
if cache:
cache.unpersist() # free the memory from the old cache version
cache = target
print(f'Target after table "other": {target_count}')