我编写了一个函数,该函数接收带有关键字参数的数据帧,如下所示:
df1 = df1.apply(add_data_ip2, axis=1, result_type="expand")
这个过程需要20分钟。函数add_ data_;Ticker";列,并执行api调用来检索财务信息,并通过数学运算处理数据来计算分数。分数在";得分";相同df的列。函数返回相同的数据帧。
df包含大约1500个股票代码,我正试图运行以下并行处理代码来减少等待时间,但没有成功。该功能保持运行,没有任何输出指示。有人能告诉我问题出在哪里吗?我将夸尔格传递到函数中的方式有什么问题吗。我试着在stackoverflow中寻找答案,但没有成功。感谢您的帮助。
from functools import partial
mapfunc = partial(add_data_ip2, axis=1, result_type="expand")
p = mp.Pool(mp.cpu_count())
df1= p.map(mapfunc, df1)
df1
另一个替代块也没有给出任何输出。
from multiprocessing import Pool
def parallelize_dataframe(df, func, n_cores=4):
df_split = np.array_split(df, n_cores)
pool = Pool(n_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
train = parallelize_dataframe(df1, mapfunc)'
就我而言,您的partial
是错误的。
您不能将df
发送到部分partial(add_data_ip2,...)
并作为df.apply(add_data_ip2, ...)
运行,因为partial
将尝试将其作为add_data_ip2(..., df)
运行
其他问题:axis=1, result_type="expand"
是df.apply()
的参数,但partial
会将其作为add_data_ip2(..., axis=1, result_type="expand")
运行
至于我,你应该定义正常功能
def mapfunc(dfx):
return dfx.apply(add_data_ip2, axis=1, result_type="expand")
或lambda
mapfunc = lambda dfx: dfx.apply(add_data_ip2, axis=1, result_type="expand")
但据我所知,Pool
不能与lambda
一起工作,因为它必须将函数和数据保存在pickle
中,之后process
必须读取它——但pickle
不能保存lambda
我用于测试的代码
import pandas as pd
from functools import partial
from multiprocessing import Pool
import numpy as np
data = {
'A': [1,2,3],
'B': [4,5,6],
'C': [7,8,9]
}
df = pd.DataFrame(data)
print('--- original ---')
print(df)
def add_data_ip2(row):
row['A'] += 10
row['B'] += 100
row['C'] += 1000
return row
# --- test 1 ---
#new_df = df.apply(add_data_ip2, axis=1, result_type="expand")
#print(new_df) # OK
# --- test 2 ---
#mapfunc = partial(add_data_ip2, axis=1, result_type="expand")
#new_df = mapfunc(df) # ERROR
#print(new_df)
# --- test 3 ---
#mapfunc = lambda df: df.apply(add_data_ip2, axis=1, result_type="expand")
#new_df = mapfunc(df) # OK
#print(new_df)
# --- test 4 ---
def mapfunc(df):
return df.apply(add_data_ip2, axis=1, result_type="expand")
new_df = mapfunc(df) # OK
print('--- mapfunc ---')
print(new_df)
# --- test Pool ---
p = Pool()
parts = np.array_split(df, 4)
results = p.map(mapfunc, parts)
print('--- Poll results ---')
for item in results:
print(item)
print('---')
print('--- concat new df ---')
new_df = pd.concat(results)
print(new_df)