我有一些数据,我正在尝试在其上应用multiprocessing.pool,因为我有一台带有16个处理器的机器。 在这里,我生成一些伪数据:
y = pd.Series(np.random.randint(400, high=600, size=1250))
date_today = datetime.now()
x = pd.date_range(date_today, date_today + timedelta(1250), freq='D')
data = pd.DataFrame(columns=['Date','Price'])
data['Date'] = x
data['Price'] = y
d={name: group for name, group in data.groupby(np.arange(len(data)) // (len(data)))}
我真正想要的是我在 for 循环参数中应用 pool。因此,使用每个常量的处理器:
parameters = range(300,550,50)
portfolio = pd.DataFrame(columns=['Parameter','Date','Price','Calculation'])
for key, value in sorted(d.items()):
for constante in parameters:
print('Constante:',constante)
# HERE I WANT TO USE MP.POOL()
在代码中,我使用某种移位窗口来执行计算。这是代码的最简单版本。所以我想在写入 DF 时在参数中为每个常量分配一个进程。如何实现这一目标?
您需要像这样使用multiprocessing.pool.map
,尽管您可能需要根据需要进行调整...
from functools import partial
from multiprocessing import Pool
def pool_map_fn(value=None, constante=None, i=None):
s = {'val': value[i:i+constante]}
window = pd.concat([s['val']['Date'],s['val']['Price']], axis=1)
window['Price'] = pd.to_numeric(window['Price'], errors='coerce').fillna(0)
calc = window['Price'].mean()
date_variable = window['Date'].iloc[-1]
price_var = window['Price'].iloc[-1]
if price_var < calc:
print('Parameter',constante,'Lower than average',date_variable,price_var,calc)
portfolio = portfolio.append({'Parameter': constante,
'Date': date_variable,
'Price': price_var,
'Calculation': calc}, ignore_index=True)
if price_var > calc:
print('Parameter',constante,'Higher than average',date_variable,price_var,calc)
parameters = range(300,550,50)
portfolio = pd.DataFrame(columns=['Parameter','Date','Price','Calculation'])
for key, value in sorted(d.items()):
for constante in parameters:
with Pool() as pool:
results = pool.map(partial(pool_map_fn, value=value, constante=constante),
range(len(value) - constante + 1))
注意:这是未经测试的,但应该可以工作,如果您遇到错误,请尝试解决它们,因为概念应该是合理的。