我正在使用Dask加载一个大型镶木地板数据帧,但如果系统不崩溃或出现一百万个错误而没有输出,我似乎无法用它做任何事情。
压缩后的数据重量约为165M,在Panda中加载后重量为13G(它非常适合45G的RAM(。
import pandas as pd
df = pd.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum() * 1e-9
# returns 13.09
df.head()
# prints the head of the dataframe properly
相反,如果使用Dask
from dask.distributed import Client
import dask.dataframe as dataframe
client = Client()
# prints: <Client: 'tcp://127.0.0.1:38576' processes=7 threads=28, memory=48.32 GB>
df = dataframe.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum().compute() * 1e-9
打印
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
[a large traceback]
KilledWorker: ("('series-groupby-sum-chunk-memory_usage-dc8dab46de985e36d76a85bf3abaccbf', 0)", <Worker 'tcp://127.0.0.1:36882', name: 2, memory: 0, processing: 1>)
如果我尝试执行df.head((、df.set_index(…(或任何其他实际计算数据帧上任何内容的操作,也会发生同样的情况。我试着减少工作者的数量,这样每个人都有更多的记忆。我也尝试过重新划分数据帧,但也失败了,出现了同样的错误。如果我将客户端的LocalCluster上的memory_limit设置为零,系统就会完全崩溃。
我做错了什么?
编辑:以下是一些关于数据的额外信息(从Pandas加载数据中获得(
In [2]: print(df.dtypes)
market_id uint32
choice_id uint64
attribute_1 bool
attribute_2 bool
attribute_3 bool
income float32
is_urban bool
distance float32
weight float32
quarter uint32
product_id int64
price float64
size float32
share float32
market_quarter int64
product_type object
outside_option int64
dtype: object
In [3]: print(df.shape)
(89429613, 17)
对象product_type是一个字符串。
Dask的工作原理是加载和处理数据块。在镶木地板的情况下,这种分块的来源于数据文件本身:内部镶木地板被组织成"块";行组";,要一起读取的行集。
在这种情况下,整个数据集似乎由一个文件中的一个行组组成。这意味着Dask没有机会将数据分割成块;您得到一个任务,它占用了一个工作线程的全部内存压力(可能等于总数据大小加上一些临时值(,而该任务只分配了总系统内存的一部分。因此出现了错误。
请注意,您可以关闭内存监控,以防止工作人员在配置中被杀死,或者直接使用memory_limit=0
等关键字。在这种情况下,您知道只有一个工作人员将承担负载。
在某些非常特殊的情况下(没有嵌套/列表/映射类型(,可以拆分行组,但不存在用于此操作的代码,而且由于数据的压缩和编码,这将是低效的。