加快熊猫迭代速度,以测试后续元素的条件



给定一个具有三列(C1,C2,C3(和一系列等长正数(coeff(的熊猫数据帧,我正在计算第四列C4,如下所示

def event(data, coeff, rate_low=2, rate_high=2):
bot_col_name = 'C4'
data[bot_col_name] = -1
I = data.index 
for k in range(len(I)-1):
i = I[k]
next_val = data.at[ I[k+1], 'C1']
c = coeff.at[i]
low_bound = next_val - rate_low*c
high_bound = next_val + rate_high*c
for j in range(k+1, len(data)):
if data.at[ I[j], 'C2'] < low_bound:
data.at[i, bot_col_name] = 0 
break
if data.at[ I[j], 'C3'] >= high_bound:
data.at[i, bot_col_name] = 1 
break
return data

换句话说,给定一行,我们计算一定的上限和下限,然后根据我们是首先达到 C2 下的上限还是 C3 的下限来设置相应的行元素。

例如,考虑熊猫表D

C1  C2  C3
0  2   5   5
1  10  12   2
2   8   3  17 
3  30  25   3

现在,如果 Coeff = [3,3,5,7],则在计算 FIRS 行的值时,low_bound为 10-2*3=4,上限为 10+2*3=16。我们现在必须找到最少的索引 i>0,以便 D.loc[i, 'C2'] <4 或 D.loc[i,'C3']>= 16。我们看到第一个这样的 i 是 1,由于这恰好满足第一个条件,我们将此行的新列设置为 0。

不幸的是,上述解决方案效率很低。我尝试通过向后计算值并尝试缓存结果来优化它(有时可以从"过去"值推断出 C4 的值(,但不幸的是它并没有明显好转。

根据我的经验,获得最大性能的最佳方法是尝试在熊猫框架内尽可能多地表达。

有什么有意义的方法可以优化上面的代码吗?

编辑。使用接受答案的代码并替换以下函数可获得最佳结果。

@njit
def get_c4(low_bound, high_bound, c2, c3):
r1 = np.argwhere( c2 < low_bound )
r2 = np.argwhere( c3 >= high_bound )
if len(r1) == 0 and len(r2) == 0:
return -1
elif len(r1) == 0:
return 1
elif len(r2) == 0:
return 0
return int (r1[0] > r2[0])

如果你真的需要一个快速的解决方案,你应该使用numba。numba的替代品是cython。两者都c编译你的 python 代码以使其更快,但我认为numba更简单,它们或多或少具有相同的性能。

编译代码以c/Fortran使numpy/pandas内部函数如此快速的原因。更多信息,请参阅熊猫文档。

让我们首先创建示例:

import numpy as np
import pandas as pd
from numba import njit
df = pd.DataFrame({
'C1': [2, 10, 8, 30],
'C2': [5, 12, 3, 25],
'C3': [5, 2, 17, 3]
})
coeff = pd.Series([3, 3, 5, 7])

然后通过转换为numba答案的代码,我们得到:

@njit
def event_v(data, coeff, rate_low=2, rate_high=2):
out = -np.ones(len(data), dtype=np.int8)
for k in range(len(data) - 1):
next_val = data[k + 1, 0]
c = coeff[k]
low_bound = next_val - rate_low * c
high_bound = next_val + rate_high * c
for j in range(k + 1, len(data)):
if data[j, 1] < low_bound:
out[k] = 0
break
if data[j, 2] >= high_bound:
out[k] = 1
break
return out
df["C4"] = event_v(df.values, coeff.values)

使用 10 000 行进行测试:

n = 10_000
df = pd.DataFrame(np.random.randint(30, size=[n, 3]), columns=["C1", "C2", "C3"])
coeff = pd.Series(np.random.randint(10, size=n))
%timeit event_v(df.values, coeff.values)
3.39 ms ± 1.13 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit event(df, coeff) # Code from the question
28.4 s ± 1.02 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

速度快约8500倍

使用 1 000

000 行进行测试:

n = 1_000_000
df = pd.DataFrame(np.random.randint(30, size=[n, 3]), columns=["C1", "C2", "C3"])
coeff = pd.Series(np.random.randint(10, size=n))
%timeit event_v(df.values, coeff.values)
27.6 s ± 1.16 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

我尝试使用问题代码运行它,但 2 个多小时后%timeit没有完成。

你确定算法完全按照你的想法工作吗?特别是在内部循环中,您查找具有 C2<下限或 C3=">= 上限的下一行(例如,如果同一行同时满足两个条件会发生什么?无论如何,假设是这样,通常,正如您所暗示的那样,在 Pandas 中加速任何算法的关键是将所有内容放入 DataFrame 中,然后使用基于列的操作。我会建议如下:

# Setup DataFrame
df = pd.DataFrame({
"C1": [2, 10, 8, 30],  
"C2": [5, 12, 3, 25],
"C3": [5, 2, 17, 3]
})
coeffs = [3, 3, 5, 7]
df['coeff'] = coeffs
df['C1_next'] = df['C1'].shift(-1)

# Fixed Values
rate_low = 2
rate_high = 2 

# Helper Functions
def test_bounds(row, lower, rate):
if lower:
bound = row['C1_next'] - rate * row['coeff']
elif not lower:
bound = row['C1_next'] + rate * row['coeff']
return bound

def earliest_bound(x, lower):
rows_to_search = df[df.index > x.name]
if lower:
slice = rows_to_search[rows_to_search['C2'] < x['lower_bound']]
elif not lower:
slice = rows_to_search[rows_to_search['C3'] >= x['higher_bound']]
if len(slice) > 0:
value = slice.index[0]
else:
value = np.NaN
return value

df['lower_bound'] = df.apply(test_bounds, args=(True, rate_low), axis=1)
df['higher_bound'] = df.apply(test_bounds, args=(False, rate_high), axis=1)
df["earliest_lower_bound_row"] = df.apply(earliest_bound, args=(True, ), axis=1)
df["earliest_higher_bound_row"] = df.apply(earliest_bound, args=(False, ), axis=1)

在这种情况下,以 earliest_lower_bound_row 和 earliest_higher_bound_row 返回的值将是满足该条件的第一行。当然,如果您只想要一个实际的 0/1 值,则可以直接使用这两列中的信息创建。

next_value,low_bound和high_bound可以很容易地矢量化,它们的计算速度非常快。第二部分不容易矢量化,因为它可能需要扫描整个数组的每一行。通过在 numpy 数组中进行比较,可以比您的实现略有改进(对于大 n 来说变得更加显着(。

def get_c4(low_bound, high_bound, c2, c3):
for idx in range(len(c2)):
if c2[idx] < low_bound:
return 0
if c3[idx] >= high_bound:
return 1
return -1
def event_new(data: pd.DataFrame, coeff, rate_low=2, rate_high=2):
data['next_val'] = data['C1'].shift(periods=-1).ffill().astype('int')
data['low_bound'] = (data['next_val'] - rate_low * coeff).astype('int')
data['high_bound'] = (data['next_val'] + rate_high * coeff).astype('int')
c2 = data['C2'].to_numpy()
c3 = data['C3'].to_numpy()
data['C4'] = data.apply(lambda x: get_c4(x.low_bound, x.high_bound, c2[data.index.get_loc(x) + 1:], c3[data.index.get_loc(x) + 1:]), axis=1)
data.drop(columns=['next_val', 'low_bound', 'high_bound'])
return data

基准测试代码:

for n in [1e2, 1e3, 1e4, 1e5, 1e6]:
n = int(n)
df = pd.DataFrame({'C1': random_list(n=n), 'C2': random_list(n=n), 'C3': random_list(n=n)})
coeff = pd.Series(random_list(start=2, stop=7, n=n))
print(f"n={n}:")
print(f"Time org: {timeit.timeit(lambda: event(df.copy(), coeff), number=1):.3f} seconds")
print(f"Time new: {timeit.timeit(lambda: event_new(df.copy(), coeff), number=1):.3f} seconds")

输出:

n=100:
Time org: 0.007 seconds
Time new: 0.012 seconds
n=1000:
Time org: 0.070 seconds
Time new: 0.048 seconds
n=10000:
Time org: 0.854 seconds
Time new: 0.493 seconds
n=100000:
Time org: 7.565 seconds
Time new: 4.456 seconds
n=1000000:
Time org: 216.408 seconds
Time new: 45.199 seconds

你可能会发现pandas函数的用法:IntervalIndex,参考如下:

data.index = pd.IntervalIndex.from_tuples(data['C1'], data['C3'])
I = data.index

您的函数可能会重写为:

def event(data, coeff, rate_low=2, rate_high=2):
for c in coeff:
i = I.map(c)
low_bound = i - rate_low*c
high_bound = i + rate_high*c
for j in range(k+1, len(data)):
if data.at[ I.get_loc(j), 'C2'] < low_bound:
data.at[i, bot_col_name] = 0 
break
if data.at[ I.get_loc(j), 'C3'] >= high_bound:
data.at[i, bot_col_name] = 1 
break

让我知道这是否有帮助!

最新更新