Dask数据帧崩溃



我正在使用Dask加载一个大型镶木地板数据帧,但如果系统不崩溃或出现一百万个错误而没有输出,我似乎无法用它做任何事情。

压缩后的数据重量约为165M,在Panda中加载后重量为13G(它非常适合45G的RAM(。

import pandas as pd
df = pd.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum() * 1e-9
# returns 13.09
df.head()
# prints the head of the dataframe properly

相反,如果使用Dask

from dask.distributed import Client
import dask.dataframe as dataframe
client = Client()
# prints: <Client: 'tcp://127.0.0.1:38576' processes=7 threads=28, memory=48.32 GB>
df = dataframe.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum().compute() * 1e-9 

打印

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
[a large traceback]
KilledWorker: ("('series-groupby-sum-chunk-memory_usage-dc8dab46de985e36d76a85bf3abaccbf', 0)", <Worker 'tcp://127.0.0.1:36882', name: 2, memory: 0, processing: 1>)

如果我尝试执行df.head((、df.set_index(…(或任何其他实际计算数据帧上任何内容的操作,也会发生同样的情况。我试着减少工作者的数量,这样每个人都有更多的记忆。我也尝试过重新划分数据帧,但也失败了,出现了同样的错误。如果我将客户端的LocalCluster上的memory_limit设置为零,系统就会完全崩溃。

我做错了什么?

编辑:以下是一些关于数据的额外信息(从Pandas加载数据中获得(

In [2]: print(df.dtypes)                                                                                                            
market_id          uint32
choice_id          uint64
attribute_1          bool
attribute_2          bool
attribute_3          bool
income            float32
is_urban             bool
distance          float32
weight            float32
quarter            uint32
product_id          int64
price             float64
size              float32
share             float32
market_quarter      int64
product_type       object
outside_option      int64
dtype: object
In [3]: print(df.shape)                                                                                                             
(89429613, 17)

对象product_type是一个字符串。

Dask的工作原理是加载和处理数据块。在镶木地板的情况下,这种分块的来源于数据文件本身:内部镶木地板被组织成"块";行组";,要一起读取的行集。

在这种情况下,整个数据集似乎由一个文件中的一个行组组成。这意味着Dask没有机会将数据分割成块;您得到一个任务,它占用了一个工作线程的全部内存压力(可能等于总数据大小加上一些临时值(,而该任务只分配了总系统内存的一部分。因此出现了错误。

请注意,您可以关闭内存监控,以防止工作人员在配置中被杀死,或者直接使用memory_limit=0等关键字。在这种情况下,您知道只有一个工作人员将承担负载。

在某些非常特殊的情况下(没有嵌套/列表/映射类型(,可以拆分行组,但不存在用于此操作的代码,而且由于数据的压缩和编码,这将是低效的。

相关内容

  • 没有找到相关文章

最新更新