使用金融市场OHLCV数据的基本熊猫DF,我正在尝试向df添加大量计算列。大量的列和计算使这个 慢 慢 慢! 尝试使用 pool.map 进行多进程处理,但一无所获。 理想情况下,循环的每次迭代都应发送到离散线程。下面代码中的简化移动平均线。 显示简单的字典和滚动平均工作缓慢 类型错误:map(( 缺少 1 个必需的位置参数:"可迭代" 所有帮助感谢
import pandas as pd
from multiprocessing.dummy import Pool as ThreadPool
#####################################################
# DJIA_OHLCV_test.csv has format:
# Date,Open,High,Low,Close,Adj Close,Volume
#
1/2/2015,17823.07031,17951.7793,17731.30078,17832.99023,17832.99023,76270000
#
1/3/2015,17823.07031,17951.7793,17731.30078,17832.99023,17832.99023,76270000
DJIA = pd.read_csv('DJIA_OHLCV_test.csv')
"""
#####################################################
# # This works! please comment out to switch
# MAdict = {'MA50':50, 'MA100':100, 'MA200':200} # Define Moving Average
Windows
# for MAkey in MAdict:
# DJIA[('ma' + MAkey)] = pd.Series.rolling(DJIA['Adj Close'],
window=MAdict[MAkey]).mean()
#####################################################
"""
# This doesn't work! please comment out to switch
MAdict = {'MA50':50, 'MA100':100, 'MA200':200}
pool = ThreadPool(3)
def moving_average(MAkey):
return pd.Series.rolling(DJIA['Adj Close'], window=MAdict[MAkey]).mean()
for MAkey in MAdict:
DJIA[('ma' + MAkey)] = pool.map(moving_average(MAkey))
#####################################################
print(DJIA.tail())
pool.map
是一个阻塞调用,因此您需要将可迭代对象作为参数直接传递给pool.map
,而不是遍历MAdict
并调用pool.map
:
import pandas as pd
from multiprocessing.dummy import Pool
def moving_average(ma):
return pd.Series.rolling(djia['Adj Close'], window=ma).mean()
if __name__ == '__main__':
N_WORKERS = 3
MA_DICT = {'MA50':50, 'MA100':100, 'MA200':200}
djia = pd.read_csv('DJIA_OHLCV_test.csv')
with Pool(N_WORKERS) as pool:
results = pool.map(moving_average, iterable=MA_DICT.values())
# concatenate results and rename columns
results = pd.concat(results, axis=1)
results.columns = ['ma' + key for key in MA_DICT]
djia = pd.concat([djia, results], axis=1)
print(djia.tail())