使用 pool.map 将每个循环的 pandas 计算发送到不同的线程 (python3.6.5)



使用金融市场OHLCV数据的基本熊猫DF,我正在尝试向df添加大量计算列。大量的列和计算使这个 慢 慢 慢! 尝试使用 pool.map 进行多进程处理,但一无所获。 理想情况下,循环的每次迭代都应发送到离散线程。下面代码中的简化移动平均线。 显示简单的字典和滚动平均工作缓慢 类型错误:map(( 缺少 1 个必需的位置参数:"可迭代" 所有帮助感谢

import pandas as pd
from multiprocessing.dummy import Pool as ThreadPool
#####################################################
# DJIA_OHLCV_test.csv has format:
# Date,Open,High,Low,Close,Adj Close,Volume
# 
1/2/2015,17823.07031,17951.7793,17731.30078,17832.99023,17832.99023,76270000
# 
1/3/2015,17823.07031,17951.7793,17731.30078,17832.99023,17832.99023,76270000
DJIA = pd.read_csv('DJIA_OHLCV_test.csv')
"""
#####################################################
# # This works! please comment out to switch 
# MAdict = {'MA50':50, 'MA100':100, 'MA200':200} # Define Moving Average 
Windows
# for MAkey in MAdict:
#     DJIA[('ma' + MAkey)] = pd.Series.rolling(DJIA['Adj Close'], 
window=MAdict[MAkey]).mean()
#####################################################
"""
# This doesn't work! please comment out to switch 
MAdict = {'MA50':50, 'MA100':100, 'MA200':200}
pool = ThreadPool(3)
def moving_average(MAkey):
return pd.Series.rolling(DJIA['Adj Close'], window=MAdict[MAkey]).mean()
for MAkey in MAdict:
DJIA[('ma' + MAkey)] = pool.map(moving_average(MAkey))
#####################################################
print(DJIA.tail())

pool.map是一个阻塞调用,因此您需要将可迭代对象作为参数直接传递给pool.map,而不是遍历MAdict并调用pool.map

import pandas as pd
from multiprocessing.dummy import Pool

def moving_average(ma):
return pd.Series.rolling(djia['Adj Close'], window=ma).mean()

if __name__ == '__main__':
N_WORKERS = 3
MA_DICT = {'MA50':50, 'MA100':100, 'MA200':200}
djia = pd.read_csv('DJIA_OHLCV_test.csv')
with Pool(N_WORKERS) as pool:
results = pool.map(moving_average, iterable=MA_DICT.values())
# concatenate results and rename columns
results = pd.concat(results, axis=1)
results.columns = ['ma' + key for key in MA_DICT]
djia = pd.concat([djia, results], axis=1)
print(djia.tail())

最新更新