Python 并行数据处理



我们有一个数据集,大约有1.5MM行。我想并行处理这个问题。该代码的主要功能是查找主信息并丰富 1.5MM 行。主数据集是一个两列数据集,大约有 25000 行。但是,我无法使多进程正常工作并正确测试其可扩展性。有人可以帮忙吗? 代码的精简版本如下

import pandas
from multiprocessing import Pool
def work(data):
    mylist =[]
    #Business Logic
    return mylist.append(data)
if __name__ == '__main__':
    data_df = pandas.read_csv('D:\retail\customer_sales_parallel.csv',header='infer')
    print('Source Data :', data_df)
    agents = 2
    chunksize = 2
    with Pool(processes=agents) as pool:
            result = pool.map(func=work, iterable= data_df, chunksize=20)
            pool.close()
            pool.join()
    print('Result :', result)

方法 work 将具有业务逻辑,我想将分区data_df传递到work中以启用并行处理。示例数据如下

CUSTOMER_ID,PRODUCT_ID,SALE_QTY
641996,115089,2
1078894,78144,1
1078894,121664,1
1078894,26467,1
457347,59359,2
1006860,36329,2
1006860,65237,2
1006860,121189,2
825486,78151,2
825486,78151,2
123445,115089,4

理想情况下,我想在每个分区中处理 6 行。

请帮忙。

感谢和问候

巴拉

首先

work返回mylist.append(data)的输出,即None。我假设(如果没有,我建议)你想返回一个处理过的数据帧。

若要分配负载,可以使用 numpy.array_split 将大型数据帧拆分为 6 行数据帧的列表,然后由 work 处理。

import pandas
import math
import numpy as np
from multiprocessing import Pool
def work(data):
    #Business Logic
    return data # Return it as a Dataframe
if __name__ == '__main__':
    data_df = pandas.read_csv('D:\retail\customer_sales_parallel.csv',header='infer')
    print('Source Data :', data_df)
    agents = 2
    rows_per_workload = 6
    num_loads = math.ceil(data_df.shape[0]/float(rows_per_workload))
    split_df = np.array_split(data_df, num_loads) # A list of Dataframes
    with Pool(processes=agents) as pool:
        result = pool.map(func=work, iterable=split_df)
        result = pandas.concat(result) # Stitch them back together    
        pool.close()
        pool.join()pool = Pool(processes=agents)
    print('Result :', result)

我最好的建议是让您在 read_csv (Docs) 中使用 chunksize 参数并迭代。这样,您就不会在尝试加载所有内容时崩溃您的 ram,而且如果您愿意,例如您可以使用线程来加快该过程。

for i,chunk in enumerate(pd.read_csv('bigfile.csv', chunksize=500000)):

我不确定这是否回答了您的具体问题,但我希望它有所帮助。

最新更新