将kwargs传递给pandas Dataframe的multiprocessing.pool.map函数



我编写了一个函数,该函数接收带有关键字参数的数据帧,如下所示:

df1 = df1.apply(add_data_ip2, axis=1, result_type="expand")

这个过程需要20分钟。函数add_ data_;Ticker";列,并执行api调用来检索财务信息,并通过数学运算处理数据来计算分数。分数在";得分";相同df的列。函数返回相同的数据帧。

df包含大约1500个股票代码,我正试图运行以下并行处理代码来减少等待时间,但没有成功。该功能保持运行,没有任何输出指示。有人能告诉我问题出在哪里吗?我将夸尔格传递到函数中的方式有什么问题吗。我试着在stackoverflow中寻找答案,但没有成功。感谢您的帮助。

from functools import partial
mapfunc = partial(add_data_ip2, axis=1, result_type="expand")
p = mp.Pool(mp.cpu_count())
df1= p.map(mapfunc, df1)
df1

另一个替代块也没有给出任何输出。

from multiprocessing import  Pool

def parallelize_dataframe(df, func, n_cores=4):
df_split = np.array_split(df, n_cores)
pool = Pool(n_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
train = parallelize_dataframe(df1, mapfunc)'

就我而言,您的partial是错误的。

您不能将df发送到部分partial(add_data_ip2,...)并作为df.apply(add_data_ip2, ...)运行,因为partial将尝试将其作为add_data_ip2(..., df)运行

其他问题:axis=1, result_type="expand"df.apply()的参数,但partial会将其作为add_data_ip2(..., axis=1, result_type="expand")运行

至于我,你应该定义正常功能

def mapfunc(dfx):
return dfx.apply(add_data_ip2, axis=1, result_type="expand")

lambda

mapfunc = lambda dfx: dfx.apply(add_data_ip2, axis=1, result_type="expand")

但据我所知,Pool不能与lambda一起工作,因为它必须将函数和数据保存在pickle中,之后process必须读取它——但pickle不能保存lambda


我用于测试的代码

import pandas as pd
from functools import partial
from multiprocessing import Pool
import numpy as np
data = {
'A': [1,2,3], 
'B': [4,5,6], 
'C': [7,8,9]
}
df = pd.DataFrame(data)
print('--- original ---')
print(df)
def add_data_ip2(row):
row['A'] += 10
row['B'] += 100
row['C'] += 1000
return row
# --- test 1 ---
#new_df = df.apply(add_data_ip2, axis=1, result_type="expand")
#print(new_df)  # OK
# --- test 2 ---
#mapfunc = partial(add_data_ip2, axis=1, result_type="expand")
#new_df = mapfunc(df)  # ERROR
#print(new_df)

# --- test 3 ---
#mapfunc = lambda df: df.apply(add_data_ip2, axis=1, result_type="expand")
#new_df = mapfunc(df)  # OK
#print(new_df)
# --- test 4 ---
def mapfunc(df):
return df.apply(add_data_ip2, axis=1, result_type="expand")
new_df = mapfunc(df)  # OK
print('--- mapfunc ---')
print(new_df)
# --- test Pool ---
p = Pool()
parts = np.array_split(df, 4)
results = p.map(mapfunc, parts)
print('--- Poll results ---')
for item in results:
print(item)
print('---')

print('--- concat new df ---')    
new_df = pd.concat(results)
print(new_df)

最新更新