并行处理数据帧



我有一个过程,需要处理数据帧的每一行,然后在每一行上附加一个新值。它是一个大型数据帧,每次处理一个数据帧需要数小时。

如果我有一个iterrow循环,它将每一行发送到一个函数,我可以并行处理以加快速度吗?该行的结果与无关

基本上我的代码类似于

for index, row in df.iterrows():
row['data'] = function[row]

有没有一种简单的方法可以加快处理速度?

虽然对行进行迭代不是一种好的做法,而且可以使用Grobby/transform聚合等替代逻辑,但如果在最坏的情况下确实需要这样做,请遵循答案。此外,你可能不需要在这里重新实现所有东西,你可以使用Dask这样的库,它是在熊猫的基础上构建的。

但为了给出Idea,您可以将multiprocessing(Pool.map(与chunking组合使用。读取chunk中的csv(或制作答案末尾提到的chuck(并将其映射到池,在处理每个chunk时添加新行(或将它们添加到列表中并制作新chunk(并从函数返回。

最后,当所有池都被执行时,将数据帧组合起来。

import pandas as pd
import numpy as np
import multiprocessing

def process_chunk(df_chunk):

for index, row in df_chunk.reset_index(drop = True).iterrows():
#your logic for updating this chunk or making new chunk here

print(row)

print("index is " + str(index))
#if you can added to same df_chunk, return it, else if you appended
#rows to have list_of_rows, make a new df with them and return
#pd.Dataframe(list_of_rows)  
return df_chunk   

if __name__ == '__main__':
#use all available cores , otherwise specify the number you want as an argument,
#for example if you have 12 cores,  leave 1 or 2 for other things
pool = multiprocessing.Pool(processes=10) 

results = pool.map(process_chunk, [c for c in pd.read_csv("your_csv.csv", chunksize=7150)])
pool.close()
pool.join()

#make new df by concatenating

concatdf = pd.concat(results, axis=0, ignore_index=True)

注意:您可以通过相同的逻辑传递chuck,而不是读取csv,以计算块大小,您可能需要类似round_of( (length of df) / (number of core available-2))的东西,例如每个块的100000/14 = round(7142.85) = 7150 rows

results = pool.map(process_chunk,
[df[c:c+chunk_size] for c in range(0,len(df),chunk_size])

与其使用df.iterrows(),为什么不使用像apply()这样的矢量化方法呢?

df.apply(function, axis=1)

.apply((是Pandas对列/行执行迭代的方法。它利用了矢量化技术,将简单和复杂操作的执行速度提高了许多倍。

查看这篇参考文章,了解它的不同之处。

其他选择是DaskVaex或只是好的老式Multiprocessing

最新更新