我有一个大约5000万行的大数据集,大约有40列的浮子。
出于自定义转换原因,我尝试使用 pyspark 的collect_list()
函数收集所有float值,并使用以下伪模:
for column in columns:
set_values(column, df.select(collect_list(column)).first()[0])
对于每列,它执行collect_list()
函数并将值设置为其他内部结构。
我正在运行上述独立群集,其中2个主机由8个内核和64 GB RAM,每位主机分配最大30 GB和6个核心,我在执行过程中得到以下例外,我怀疑它必须与收集的阵列的大小进行。
java.lang.outofmemoryerror:请求的数组大小超过vm limit
我尝试了spark-defaults.conf
中的多种配置,包括分配更多内存,分区号,并行性甚至Java选项,但仍然没有运气。
因此,我的假设是collect_list()
与执行人/驱动程序资源与较大数据集中的资源深深相关,或者与这些数据无关?
我可以使用任何设置来帮助我消除此问题,否则我必须使用collect()
功能?
collect_list
不仅比在您的情况下调用collect
好。对于大型数据集来说,两者都是非常坏的主意。
都需要与记录数成比例的内存量,而collect_list
只是添加了Shuffle的开销。
换句话说 - 如果您没有选择,并且需要本地结构,请使用select
和collect
并增加驱动程序内存。它不会使事情变得更糟:
df.select(column).rdd.map(lambda x: x[0]).collect()