在 pyspark collect_list() 执行期间"java.lang.OutOfMemoryError: Requested array size exceeds VM limit"



我有一个大约5000万行的大数据集,大约有40列的浮子。

出于自定义转换原因,我尝试使用 pyspark collect_list()函数收集所有float值,并使用以下伪模:

for column in columns:
   set_values(column, df.select(collect_list(column)).first()[0])

对于每列,它执行collect_list()函数并将值设置为其他内部结构。

我正在运行上述独立群集,其中2个主机由8个内核和64 GB RAM,每位主机分配最大30 GB和6个核心,我在执行过程中得到以下例外,我怀疑它必须与收集的阵列的大小进行。

java.lang.outofmemoryerror:请求的数组大小超过vm limit

我尝试了spark-defaults.conf中的多种配置,包括分配更多内存,分区号,并行性甚至Java选项,但仍然没有运气。

因此,我的假设是collect_list()执行人/驱动程序资源与较大数据集中的资源深深相关,或者与这些数据无关?

我可以使用任何设置来帮助我消除此问题,否则我必须使用collect()功能?

collect_list不仅比在您的情况下调用collect好。对于大型数据集来说,两者都是非常坏的主意。

几乎没有实际应用。

都需要与记录数成比例的内存量,而collect_list只是添加了Shuffle的开销。

换句话说 - 如果您没有选择,并且需要本地结构,请使用selectcollect并增加驱动程序内存。它不会使事情变得更糟:

df.select(column).rdd.map(lambda x: x[0]).collect()

相关内容

  • 没有找到相关文章

最新更新