通过Python中的Numpy阵列迭代的速度是什么?



我想知道是否有更好的方法可以通过numpy数组迭代?我已经定时了嵌套的迭代,每个循环大约需要40-50秒,我想知道是否有更快的方法?我知道通过Numpy阵列循环并不理想,但是我没有想法。我在堆栈溢出方面浏览了许多问题,但所有这些问题最终都使我更加困惑。

我尝试使用tolist()函数将Numpy数组转换为列表,但是运行时间同样较慢,即使不是更糟。

def euc_distance(array1, array2):
    return np.power(np.sum((array1 - array2)**2) , 0.5)
for i in range(N):
    for j,n in enumerate(data2.values): 
        distance = euc_distance(n, D[i]) 
        if distance < Dradius[i] and NormAttListTest[j] == "Attack":
            TP += 1

我的euc_distance函数以数组形式(在我的情况下为5维(输入传递,以输出1维值。我的data2.values是我通过PANDAS框架访问Numpy数组的方式,即[500 000,5]数据框。

(请注意,normattlisttest是一个列表,该列表具有"攻击"的分类数据,并且"标记为每个单独的测试数据"(。

您的问题是您以错误的方式使用numpy,因为numpyMATLAB这样的矢量化计算都涉及。考虑以下代码修改。我用普通的numpy代码替换了您的循环,从而有效地利用了2D数组的矢量化。结果,代码的运行速度快100倍。

import functools
import numpy as np
import time
# decorator to measure running time
def measure_running_time(echo=True):
    def decorator(func):
        @functools.wraps(func)
        def wrapped(*args, **kwargs):
            t_1 = time.time()
            ans = func(*args, **kwargs)
            t_2 = time.time()
            if echo:
                print(f'{func.__name__}() running time is {t_2 - t_1:.2f} s')
            return ans
        return wrapped
    return decorator

def euc_distance(array1, array2):
    return np.power(np.sum((array1 - array2) ** 2), 0.5)
# original function
@measure_running_time()
def calculate_TP_1(N, data2, D, Dradius, NormAttListTest, TP=0):
    for i in range(N):
        for j, n in enumerate(data2):
            distance = euc_distance(n, D[i])
            if distance < Dradius[i] and NormAttListTest[j] == "Attack":
                TP += 1
    return TP
# new version
@measure_running_time()
def calculate_TP_2(N, data2, D, Dradius, NormAttListTest, TP=0):
    # this condition is the same for every i value
    NormAttListTest = np.array([val == 'Attack' for val in NormAttListTest])
    for i in range(N):
        # don't use loop over numpy arrays
        # compute distance for all the rows
        distance = np.sum((data2 - D[i]) ** 2, axis=1) ** .5
        # check conditions for all the row
        TP += np.sum((distance < Dradius[i]) & (NormAttListTest))
    return TP

if __name__ == '__main__':
    N = 10
    NN = 100_000
    D = np.random.randint(0, 10, (N, 5))
    Dradius = np.random.randint(0, 10, (N,))
    NormAttListTest = ['Attack'] * NN
    NormAttListTest[:NN // 2] = ['Defence'] * (NN // 2)
    data2 = np.random.randint(0, 10, (NN, 5))
    print(calculate_TP_1(N, data2, D, Dradius, NormAttListTest))
    print(calculate_TP_2(N, data2, D, Dradius, NormAttListTest))

输出:

calculate_TP_1() running time is 7.24 s
96476
calculate_TP_2() running time is 0.06 s
96476

最新更新