使用dask和multiprocessing优化内存使用



我使用dask来处理来自许多参数变化的数据,我的目标是通过对形状小于2000的小数组构造的dask数组的操作构建一个600,000(例数或列数)的最终dask数据框架。这里,我的最终数据帧计算为6400例

dd_final.compute()
0_95_euclidean  1_95_euclidean  2_95_euclidean  3_95_euclidean  ...  96_80_l1  97_80_l1  98_80_l1  99_80_l1
0           0.005670        0.010449        0.010756        0.009914  ...  0.007422  0.002066  0.009693  0.003475
1           0.006255        0.009970        0.007987        0.007785  ...  0.006119  0.002104  0.009638  0.004142
2           0.011956        0.018662        0.016426        0.015260  ...  0.013276  0.003897  0.019816  0.007479
3           0.021639        0.037590        0.036749        0.028090  ...  0.029751  0.009725  0.038956  0.011870
4           0.014482        0.022963        0.025416        0.017909  ...  0.017033 -0.002616  0.026231  0.000978
...              ...             ...             ...             ...  ...       ...       ...       ...       ...
1289        0.597443        1.044522        0.898732        0.940219  ...  0.914094  0.792133  0.744501  0.632575
1290        0.594463        1.041562        0.894501        0.935068  ...  0.913409  0.790555  0.742357  0.628366
1291        0.592523        1.035600        0.891222        0.932510  ...  0.907414  0.786722  0.738844  0.627611
1292        0.606415        1.059642        0.912963        0.951523  ...  0.922719  0.800610  0.751161  0.640515
1293        0.601242        1.049654        0.903112        0.942681  ...  0.915391  0.794133  0.744752  0.636788
[1294 rows x 6400 columns]

第一种方法:我对每个函数使用池starmap来加速8核CPU的操作,并将结果放入任务数组。

def MP_a_func(func,iterable,proc,chunk):
#
pool=multiprocessing.Pool(processes=proc)
Result=pool.starmap_async(func,iterable,chunksize=chunk)
#
return Result
if __name__ == '__main__':
performances=MP_a_func(Post_processing_Weights,iterable,proc,chunk)
da_arr=da.from_array(performances.get(),chunks=chunk)
#... Some operations
#...
dd_final=dd.from_dask_array(da_arr).repartition(chunk)

这种方法失败,因为在将MP对象存储到磁盘数组之前,内存不足以存储MP对象。

第二种方法:我想使用pool starmap但要对可迭代对象进行切片并在每个切片处附加任务数组

for iterable in [iter1,iter2,...,iter10000]:
if __name__ == '__main__':
performances=MP_a_func(Post_processing_Weights,iterable,proc,chunk)
partial_da_arr=da.from_array(performances.get(),chunks=chunk)
# append or assign to da_arr ??

如何在每一步使用append或分配给任务数组而不加载内存或有更好的方法?

谢谢你的帮助

600000(大小写或列数)

根据您在示例中提供的列的名称判断,您的工作流可能受益于更好地重新组织数据,这也可能简化计算。

我如何在每一步使用append或分配给任务数组而不加载内存或有更好的方法?

您可能对delayedAPI感兴趣,但问题/问题不够清楚,无法提供进一步的建议。

最新更新