保存到文件时Dask崩溃



我试图对数据集进行onehot编码,然后按特定列分组,这样我就可以为该列中的每个项目获得一行,并获得特定行的onehot列的聚合视图。它似乎适用于小数据,使用dask似乎适用于大数据集,但我在尝试保存文件时遇到了问题。我试过CSV和拼花地板文件。我想保存结果,然后可以稍后分块打开它。

下面的代码显示了这个问题(下面的脚本为onehot编码生成2M行和多达30k个唯一值(。

import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, wait
sizeOfRows = 2000000
columnsForDF = 30000 
partitionsforDask = 500 
print("partition is ", partitionsforDask)

cluster = LocalCluster()
client = Client(cluster)
print(client)

df = pd.DataFrame(np.random.randint(0,columnsForDF,size=(sizeOfRows, 2)), columns=list('AB'))
ddf = dd.from_pandas(df, npartitions=partitionsforDask)
# ddf = ddf.persist()
wait(ddf)
# %%time
# need to globally know the categories before one hot encoding
ddf = ddf.categorize(columns=["B"])
one_hot = dd.get_dummies(ddf, columns=['B'])
print("starting groupby")
# result = one_hot.groupby('A').max().persist() # or to_parquet/to_csv/compute/etc.
# result = one_hot.groupby('A', sort=False).max().to_csv('./daskDF.csv', single_file = True)
result = one_hot.groupby('A', sort=False).max().to_parquet('./parquetFile')
wait(result)

它似乎一直有效,直到它完成csv或镶木地板的分组。在这一点上,我得到了许多关于worker超过95%内存的错误,然后程序退出并返回一个";killedworker;异常:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
KilledWorker: ("('dataframe-groupby-max-combine-3ddcd8fc854613101b4bdc7fccde32cd', 1, 0, 0)", <Worker 'tcp://127.0.0.1:33815', name: 6, memory: 0, processing: 22>)

在监控我的机器时,我从未接近超过内存,我的驱动器空间超过300 GB,从未使用过(在这个过程中没有创建任何文件,尽管它在groupby部分中(。

我能做什么?

更新-我想我应该加一个奖励。我对.to_csv也有同样的问题,因为其他人也有类似的问题,我希望它对广大观众有价值。

让我们首先考虑最终结果:它将是一个包含30'000列和30'000行的数据帧。此对象将占用大约6.7 GB的内存。(使用dtype可以减少内存占用,而且并非所有组合都可能出现在数据中,但为了简单起见,我们忽略这些点(

现在,假设我们只有两个分区,每个分区都包含所有可能的伪变量组合。这意味着每个工作者至少需要6.7 GB来存储.groupby().max()对象,但最后一步需要13.4 GB,因为最后一个工作者需要找到这两个对象的.max。当然,如果您有更多的分区,那么最后一个工作者的内存需求就会增加。有一种方法可以通过在相关函数中指定CCD_ 4来控制CCD_。例如,如果指定.max(split_every=2),则任何工作者最多将接收2个对象(split_every的默认值为8(。

在处理500个分区的早期,每个分区可能只包含可能的伪值的子集,因此内存需求很低。然而,随着dask在计算最终结果方面的进展,它将使用不同的伪值组合来组合对象,因此内存需求将在管道的末尾增长。

原则上,你也可以使用资源来限制一个工人一次要承担的任务数量,但如果工人没有足够的内存来处理这些任务,那就没有帮助了。

有什么潜在的解决办法?至少有几个选项:

  • 使用资源更大的员工;

  • 简化任务(例如,基于可能类别的子集将任务划分为几个子任务(;

  • 使用delayed/futures开发一个自定义工作流,该工作流将对数据进行排序并实现自定义优先级,确保工作人员在进行最终聚合之前完成一个子集的工作。

如果工作内存是一个约束,那么子集必须非常细粒度。例如,在极限情况下,只对一个可能的伪变量组合进行子集设置将具有非常低的内存需求(初始数据加载和过滤器仍需要足够的内存来适应分区(,但当然,这是一个极端的例子,会生成数万个任务,因此建议使用较大的类别组(平衡任务数量和内存需求(。要查看示例,您可以查看此相关答案。

最新更新