我有一个股票数据集,为了简化这个问题,它只有一个"关闭"列,看起来像这样:
import pandas as pd
df = pd.DataFrame({'close': [1, 1.01, 1.015, 1.0, 0.98]})
print(df)
close
0 1.000
1 1.010
2 1.015
3 1.000
4 0.980
现在我想对所有数据点进行分类,看它们是否是有利可图的长期机会。有利可图的长期机会意味着,未来的价格将达到一个特定的水平,该水平将大于或等于指定的获胜水平。与此相反,如果之前达到亏损水平,它将无法盈利。
我目前正在使用这个功能:
def classify_long_opportunities(
df: pd.DataFrame,
profit_pct: float = 0.01, # won after one percent
loss_pct: float = 0.01, # lost after one percent
):
res = []
for t in df[['close']].itertuples():
idx, c = t # extract index and close price
win = c + c * profit_pct # calculate the desired win level
loss = c - c * loss_pct # "" loss level
for e in df[['close']].loc[idx:].itertuples(): # iterate through the future prices
_, p_c = e # extract posterior closing price
if c < win <= p_c: # found a fitting profit level
res.append(1)
break
elif c > loss >= p_c: # found a losing level
res.append(-1)
break
else: # didn't found fitting level at all
res.append(0)
df['long_opportunities'] = res
功能分类正确:
classify_long_opportunities(df=df)
print(df)
close long_opportunities
0 1.000 1
1 1.010 -1
2 1.015 -1
3 1.000 -1
4 0.980 0
但速度很慢。如何使用矢量化(如numpy.where
、numpy.select
或pandas
函数(来优化此函数?
代码优化
保持完全相同的想法,包括迭代,你的代码可以被矢量化一点,至少失去了内部的循环
def classOpt(df, profit_pct=1.01, loss_pct=0.99):
vals=df.close.values
res=[]
for i in range(len(df)):
win=vals[i]*profit_pct
loss=vals[i]*loss_pct
futw=np.argmax(vals[i:]>=win)
futl=np.argmax(vals[i:]<=loss)
if (futw>0) and (futl==0 or futl>futw):
res.append(1)
elif (futl>0) and (futw==0 or futw>futl):
res.append(-1)
else:
res.append(0)
df['opt']=res
这个想法是,在每个阶段,至少要"工作";矢量化的";未来值数组的方式。所以在第i阶段,在vals[i:]
上。我们得到一个布尔数组,表示哪个未来值是获胜的vals[i:]>=win
。哪一个是损失vals[i:]<=loss
。
有了np.argmax
,我们可以很容易地知道这种输赢何时发生。np.argmax(vals[i:]>=win)
。注意,由于我们在未来的值中包含了列i(实际上是一个sentinel(,所以我们知道第一个布尔值必须是False。因此,如果np.argmax(vals[i:]>=win)
为0,那就意味着未来不会有胜利。如果不是0,则为将来第一次获胜的天数。
同样,对于未来损失
所以,如果futw
是非零的,并且futl
是0或者大于futw
,则结果是1。也就是说,如果有一场胜利即将到来,要么没有损失,要么在未来比胜利更进一步的损失(再次,我觉得这是一个奇怪的规则,但这是你的代码之一(
对称情况是a-1。否则为0。
滑动窗口方法
(注意:这是我在一两周内第四次在SO问题中使用此函数。有点重复:-(。在选定的答案中,顺便说一句,到目前为止,它真的很有效。我担心这一次,如果mozway设法纠正结果差异,它将不会顺利进行(。
该方法基于np.lib.stride_tricks.sliding_window_view
函数。
如果M
是[1,2,3,10,20,30,40]
,则sliding_window_view(M, (3,))
是
[[1, 2, 3],
[2, 3, 10],
[3, 10, 20],
[10, 20, 30],
[20, 30, 40]]
我想你已经看到了它如何对具有未来价值的计算有用。
它的一个美妙之处在于,它只是一种景色。因此,没有真正为这个(可能是巨大的(数组分配内存。
在您的情况下,因为我们想要所有未来的值,所以我们需要len(df)
列。既然我们想要,即使是最后一行,我们也需要首先用一些NaN填充这些值。len(df)-1
NaN精确,因此最后一行可以具有与第一行一样多的(无效(预测。
然后我们有一个len(df)×len(df)
视图。第一列为实际值。其他每列都是未来的值,在D+1,D+2。。。
从那时起,我们只需要做与以前完全相同的事情,使用argmax(...>win)
。
这是代码
def slide(df, profit_pct=1.01, loss_pct=0.99):
n=len(df)
valswithnan=np.concatenate([df.close.values, [np.nan]*(n-1)])
view=np.lib.stride_tricks.sliding_window_view(valswithnan, (n,))
win=(view[:,0]*profit_pct).reshape(-1,1) # Column of win
loss=(view[:,0]*loss_pct).reshape(-1,1) # of loss
futw=np.argmax(view>=win, axis=1) # For each line, index of future win or 0
futl=np.argmax(view<=loss, axis=1)
res=(futw>0)*1 # res is 1 where there is a future win
res[(futl>0) & ((futw>futl) | (futw==0))]=-1 # unless a future loss exists sooner
df['slide']=res
实验设置
def gen():
# Something that looks like random variations. With equal opportunities to win/lose...
return pd.DataFrame({'close':100+np.cumsum(np.random.normal(0, 1, (10000,)))})
df=gen()
# Verify column differences between all 4 methods
# 'long_opportunies' for yours
# opt for my 1st version
# slide for my 2nd version with sliding_window_view
# cuminmax for mozway's (but check fails for it. Pity, since timings rock)
def check():
df=gen()
classify_long_opportunities(df)
classOpt(df)
slide(df)
cuminmax(df)
return ((df['long_opportunities']-df.opt)**2).sum(), ((df.opt-df.slide)**2).sum(), ((df.opt-df.cmm)**2).sum()
检查了几十次。所有三种方法(你的和我的两种(总是给出完全相同的结果。
但是时间。。。
计时
方法 | 计时 | |
---|---|---|
您的方法 | 14660毫秒 | |
我的第一次 | 240毫秒 | |
我的第二次 | 152毫秒 | |
Mozway | 40毫秒 |
IIUC,您可以使用累积最小值/最大值和numpy.select
:
import numpy as np
profit_pct = 0.01
loss_pct = 0.01
m1 = df['close'].mul(1+profit_pct).ge(df.loc[::-1,'close'].cummax())
m2 = df['close'].mul(1-loss_pct).le(df.loc[::-1,'close'].cummin())
df['long_opportunities'] = np.select([m2, m1], [0, -1], 1)
print(df)
输出:
close long_opportunities
0 1.000 1
1 1.010 -1
2 1.015 -1
3 1.000 -1
4 0.980 0