如何处理并行返回大结果的小数据帧



我有一个大约有6000万行的Pandas DataFrame。前60行对应于第一组,依此类推。这些组中的每一个都需要并行处理,并且每个都返回一个大于4GB的NumPy数组。我有足够的RAM和内核来并行处理大约100个这样的组。最后,我必须将每组的结果相加才能得到最终结果,所以最终结果不大于每组的结果。

处理过程大致是:在给定大小的网格(NumPy数组(上对一组数据进行装箱,并计算该网格与自身的张量外积(numpy.multiply.outer(。然后对得到的乘积求和,得到我的最终数组。

如果我按顺序处理这些组,处理可能需要几天时间。因此,我要求并行处理它们。

首先,我尝试了multiprocessing。由于组的数量很大,我将数据帧分为块,每个块有10000个组。函数将接收一个块,处理块中的每个组,并返回这些结果的总和(我的最终结果的部分总和(。为了得到我的最终结果,我只需要对每个过程的结果求和。我可以看到进程并行运行,但当返回结果时,我得到了错误'OverflowError('cannot serialize a bytes objects larger than 4GiB',)'。我尝试了这个解决方案:python multiprocessing-OverflowError(';无法序列化大于4GiB';的字节对象(,这导致我出现了答案中提到的第二个错误。然而,我不能将我的函数重新定义为void,因为这需要我将数据存储在文件中,而且我没有足够的磁盘空间来存储所有数据。此外,进程将竞争写入磁盘,这将产生瓶颈;然后读取文件、重建数组和添加结果也需要很长时间。

然后我尝试了Dask。首先,我将Dask数据帧划分为60组,创建一个处理每个组的函数,并使用delayed调用它。

from dask import delayed
results = []
for partition in ddf.partitions:
result = delayed(func)(partition)
results.append(result)
delayed(sum)(results).compute()

然而,大多数进程大部分时间都处于休眠状态,我看不到太多的并行性。显然,Dask不能很好地处理大型任务图。

为了避免出现大的任务图,我将函数替换为一个采用大数据帧(包含许多组(并在函数内处理该数据帧的每个组的函数(类似于multiprocessing方法(。

import numpy as np, pandas as pd
def func(df):
group_len = 60
ngroups = int(len(df) / group_len) # len(df) is always a multiple of group_len
sum_array = np.zeros(output_expected_shape) # Here the shape can be up to 6-dimensional
for group in range(ngroups):
# Do the processing...
sum_array += group_result
return sum_array

我对数据帧进行了分区,使每个分区都有600000行(100个组(,并使用delayed方法对其进行了调用。然而,再一次,大多数进程大部分时间都处于休眠状态,我无法观察到真正的并行性。我还注意到,使用GUI时,工作人员存储了大量数据,尽管有足够的RAM可用。我试着让每个分区有6000000行,但这也不起作用(同时让所有核心都闲置似乎不是最佳解决方案(。

然后,我尝试了Daskmap_partitions。它的问题是,当您需要每个函数生成一个NumPy数组时,它不能很好地工作。你可以看到这个问题的问题是什么:如何在Dask中为每个分区返回一个NumPy数组?。但总的来说,它返回一个单独的数组,其中部分结果垂直堆叠。为了得到真正的结果,我必须对数组进行切片,取与部分结果相对应的每个元素并将它们相加。但是1(破坏了使用并行性的点,2(返回的数组可能太大,因为它包含许多不同的结果,并且可能不适合内存。

显然,如果定义了chunksize,但块大小是由数据量("16MB"、"3GB"(而不是行数(这是我所需要的,因为组对应于特定数量的行(定义的,那么map_partitions方法会很好地工作。

我希望能够高效地并行处理这些组。到目前为止,顺序解决方案仍然是我的最佳选择(这是唯一一个最终让我得出结果的解决方案,因为Dask工作人员在一段时间后开始引发TCP超时错误(,但它太慢了,导致大量资源未使用。

使用delayed(sum)(results).compute()的问题是您请求将所有results同时传递给sum。这对于一个小的结果列表来说不是问题,但当您的结果列表超过工作线程的总内存容量时,您的管道就会中断。

解决此问题的最简单方法是使用现有集合来实现,例如使用此处的建议。(你提到了行分组的问题,但这在这里得到了解决(

减少内存使用的另一种方法是使用tree summation之类的东西积极地聚合所有分区/块/阵列,请参阅docs:

L = zs
while len(L) > 1:
new_L = []
for i in range(0, len(L), 2):
lazy = add(L[i], L[i + 1])  # add neighbors
new_L.append(lazy)
L = new_L                       # swap old list for new
dask.compute(L)

最新更新