Python如何用pandas|numpy对这个嵌套循环进行矢量化



我有一个股票数据集,为了简化这个问题,它只有一个"关闭"列,看起来像这样:

import pandas as pd
df = pd.DataFrame({'close': [1, 1.01, 1.015, 1.0, 0.98]})
print(df)
close
0  1.000
1  1.010
2  1.015
3  1.000
4  0.980

现在我想对所有数据点进行分类,看它们是否是有利可图的长期机会。有利可图的长期机会意味着,未来的价格将达到一个特定的水平,该水平将大于或等于指定的获胜水平。与此相反,如果之前达到亏损水平,它将无法盈利。

我目前正在使用这个功能:

def classify_long_opportunities(
df: pd.DataFrame,
profit_pct: float = 0.01,  # won after one percent
loss_pct: float = 0.01,    # lost after one percent
):
res = []
for t in df[['close']].itertuples():
idx, c = t                                      # extract index and close price
win  = c + c * profit_pct                       # calculate the desired win level
loss = c - c * loss_pct                         # "" loss level
for e in df[['close']].loc[idx:].itertuples():  # iterate through the future prices
_, p_c = e                                  # extract posterior closing price
if c < win <= p_c:                          # found a fitting profit level
res.append(1)
break
elif c > loss >= p_c:                       # found a losing level
res.append(-1)
break
else:                                           # didn't found fitting level at all
res.append(0)
df['long_opportunities'] = res

功能分类正确:

classify_long_opportunities(df=df)
print(df)
close  long_opportunities
0  1.000                   1
1  1.010                  -1
2  1.015                  -1
3  1.000                  -1
4  0.980                   0

但速度很慢。如何使用矢量化(如numpy.wherenumpy.selectpandas函数(来优化此函数?

代码优化

保持完全相同的想法,包括迭代,你的代码可以被矢量化一点,至少失去了内部的循环

def classOpt(df, profit_pct=1.01, loss_pct=0.99):
vals=df.close.values
res=[]
for i in range(len(df)):
win=vals[i]*profit_pct
loss=vals[i]*loss_pct
futw=np.argmax(vals[i:]>=win)
futl=np.argmax(vals[i:]<=loss)
if (futw>0) and (futl==0 or futl>futw):
res.append(1)
elif (futl>0) and (futw==0 or futw>futl):
res.append(-1)
else:
res.append(0)
df['opt']=res

这个想法是,在每个阶段,至少要"工作";矢量化的";未来值数组的方式。所以在第i阶段,在vals[i:]上。我们得到一个布尔数组,表示哪个未来值是获胜的vals[i:]>=win。哪一个是损失vals[i:]<=loss

有了np.argmax,我们可以很容易地知道这种输赢何时发生。np.argmax(vals[i:]>=win)。注意,由于我们在未来的值中包含了列i(实际上是一个sentinel(,所以我们知道第一个布尔值必须是False。因此,如果np.argmax(vals[i:]>=win)为0,那就意味着未来不会有胜利。如果不是0,则为将来第一次获胜的天数。

同样,对于未来损失

所以,如果futw是非零的,并且futl是0或者大于futw,则结果是1。也就是说,如果有一场胜利即将到来,要么没有损失,要么在未来比胜利更进一步的损失(再次,我觉得这是一个奇怪的规则,但这是你的代码之一(

对称情况是a-1。否则为0。

滑动窗口方法

(注意:这是我在一两周内第四次在SO问题中使用此函数。有点重复:-(。在选定的答案中,顺便说一句,到目前为止,它真的很有效。我担心这一次,如果mozway设法纠正结果差异,它将不会顺利进行(。

该方法基于np.lib.stride_tricks.sliding_window_view函数。

如果M[1,2,3,10,20,30,40],则sliding_window_view(M, (3,))

[[1,   2,  3],
[2,   3, 10],
[3,  10, 20],
[10, 20, 30],
[20, 30, 40]]

我想你已经看到了它如何对具有未来价值的计算有用。

它的一个美妙之处在于,它只是一种景色。因此,没有真正为这个(可能是巨大的(数组分配内存。

在您的情况下,因为我们想要所有未来的值,所以我们需要len(df)列。既然我们想要,即使是最后一行,我们也需要首先用一些NaN填充这些值。len(df)-1NaN精确,因此最后一行可以具有与第一行一样多的(无效(预测。

然后我们有一个len(df)×len(df)视图。第一列为实际值。其他每列都是未来的值,在D+1,D+2。。。

从那时起,我们只需要做与以前完全相同的事情,使用argmax(...>win)

这是代码

def slide(df, profit_pct=1.01, loss_pct=0.99):
n=len(df)
valswithnan=np.concatenate([df.close.values, [np.nan]*(n-1)])
view=np.lib.stride_tricks.sliding_window_view(valswithnan, (n,))
win=(view[:,0]*profit_pct).reshape(-1,1) # Column of win
loss=(view[:,0]*loss_pct).reshape(-1,1) # of loss
futw=np.argmax(view>=win, axis=1) # For each line, index of future win or 0
futl=np.argmax(view<=loss, axis=1)
res=(futw>0)*1 # res is 1 where there is a future win
res[(futl>0) & ((futw>futl) | (futw==0))]=-1 # unless a future loss exists sooner
df['slide']=res

实验设置

def gen():
# Something that looks like random variations. With equal opportunities to win/lose...
return pd.DataFrame({'close':100+np.cumsum(np.random.normal(0, 1, (10000,)))})
df=gen()
# Verify column differences between all 4 methods
# 'long_opportunies' for yours
# opt for my 1st version
# slide for my 2nd version with sliding_window_view
# cuminmax for mozway's (but check fails for it. Pity, since timings rock)
def check():
df=gen()
classify_long_opportunities(df)
classOpt(df)
slide(df)
cuminmax(df)
return ((df['long_opportunities']-df.opt)**2).sum(), ((df.opt-df.slide)**2).sum(), ((df.opt-df.cmm)**2).sum()

检查了几十次。所有三种方法(你的和我的两种(总是给出完全相同的结果。

但是时间。。。

计时

方法计时
您的方法14660毫秒
我的第一次240毫秒
我的第二次152毫秒
Mozway40毫秒

IIUC,您可以使用累积最小值/最大值和numpy.select:

import numpy as np
profit_pct = 0.01
loss_pct = 0.01
m1 = df['close'].mul(1+profit_pct).ge(df.loc[::-1,'close'].cummax())
m2 = df['close'].mul(1-loss_pct).le(df.loc[::-1,'close'].cummin())
df['long_opportunities'] = np.select([m2, m1], [0, -1], 1)
print(df)

输出:

close  long_opportunities
0  1.000                   1
1  1.010                  -1
2  1.015                  -1
3  1.000                  -1
4  0.980                   0

最新更新