我们有一个数据集,大约有1.5MM行。我想并行处理这个问题。该代码的主要功能是查找主信息并丰富 1.5MM 行。主数据集是一个两列数据集,大约有 25000 行。但是,我无法使多进程正常工作并正确测试其可扩展性。有人可以帮忙吗? 代码的精简版本如下
import pandas
from multiprocessing import Pool
def work(data):
mylist =[]
#Business Logic
return mylist.append(data)
if __name__ == '__main__':
data_df = pandas.read_csv('D:\retail\customer_sales_parallel.csv',header='infer')
print('Source Data :', data_df)
agents = 2
chunksize = 2
with Pool(processes=agents) as pool:
result = pool.map(func=work, iterable= data_df, chunksize=20)
pool.close()
pool.join()
print('Result :', result)
方法 work
将具有业务逻辑,我想将分区data_df传递到work
中以启用并行处理。示例数据如下
CUSTOMER_ID,PRODUCT_ID,SALE_QTY
641996,115089,2
1078894,78144,1
1078894,121664,1
1078894,26467,1
457347,59359,2
1006860,36329,2
1006860,65237,2
1006860,121189,2
825486,78151,2
825486,78151,2
123445,115089,4
理想情况下,我想在每个分区中处理 6 行。
请帮忙。
感谢和问候
巴拉
,work
返回mylist.append(data)
的输出,即None
。我假设(如果没有,我建议)你想返回一个处理过的数据帧。
若要分配负载,可以使用 numpy.array_split
将大型数据帧拆分为 6 行数据帧的列表,然后由 work
处理。
import pandas
import math
import numpy as np
from multiprocessing import Pool
def work(data):
#Business Logic
return data # Return it as a Dataframe
if __name__ == '__main__':
data_df = pandas.read_csv('D:\retail\customer_sales_parallel.csv',header='infer')
print('Source Data :', data_df)
agents = 2
rows_per_workload = 6
num_loads = math.ceil(data_df.shape[0]/float(rows_per_workload))
split_df = np.array_split(data_df, num_loads) # A list of Dataframes
with Pool(processes=agents) as pool:
result = pool.map(func=work, iterable=split_df)
result = pandas.concat(result) # Stitch them back together
pool.close()
pool.join()pool = Pool(processes=agents)
print('Result :', result)
我最好的建议是让您在 read_csv (Docs) 中使用 chunksize 参数并迭代。这样,您就不会在尝试加载所有内容时崩溃您的 ram,而且如果您愿意,例如您可以使用线程来加快该过程。
for i,chunk in enumerate(pd.read_csv('bigfile.csv', chunksize=500000)):
我不确定这是否回答了您的具体问题,但我希望它有所帮助。