如何使用numpy改进自定义函数矢量化?



我是python的新手,对矢量化更是新手。我尝试对一个自定义相似性函数进行矢量化,该函数应该返回输入数组中每行之间的成对相似性矩阵。

进口:

import numpy as np
from itertools import product
from numpy.lib.stride_tricks import sliding_window_view
输入>:
np.random.seed(11)
a = np.array([0, 0, 0, 0, 0, 10, 0, 0, 0, 50, 0, 0, 5, 0, 0, 10])
b = np.array([0, 0, 5, 0, 0, 10, 0, 0, 0, 50, 0, 0, 10, 0, 0, 5])
c = np.array([0, 0, 5, 1, 0, 20, 0, 0, 0, 30, 0, 1, 10, 0, 0, 5])
m = np.array((a,b,c))

输出:

custom_func(m)
array([[   0,  440, 1903],
[ 440,    0, 1603],
[1903, 1603,    0]])

功能:

def custom_func(arr):
diffs = 0
max_k = 6

for n in range(1, max_k):
arr1 = np.array([np.sum(i, axis = 1) for i in sliding_window_view(arr, window_shape = n, axis = 1)])

# this function uses np.maximum and np.minimum to subtract the max and min elements (element-wise) between two rows and then sum up the entire of that subtraction
diffs += np.sum((np.array([np.maximum(arr1[i[0]], arr1[i[1]]) for i in product(np.arange(len(arr1)), np.arange(len(arr1)))]) - np.array([np.minimum(arr1[i[0]], arr1[i[1]]) for i in product(np.arange(len(arr1)), np.arange(len(arr1)))])), axis = 1) * n

diffs = diffs.reshape(len(arr), -1)

return diffs

这个函数非常简单,它总结了N个滑动窗口中最大行和最小行之间的元素差异。这个函数比我在发现向量化(for loops and pandas dataframes)之前使用的函数要快得多。

我的第一个想法是找出一种方法来找到最小和最大的数组在一个单一的传递,因为我目前认为它必须做两个传递,但我无法弄清楚如何。此外,在我当前的函数中有一个for循环,因为我需要为多个N个滑动窗口执行此操作,并且我不确定如何在没有循环的情况下执行此操作。

任何帮助都是感激的!

以下是您可以应用于代码的几个优化:

  • 使用Numba的JIT加快计算速度,用嵌套循环
  • 代替product调用
  • 使用更高效的滑动窗口算法(更好的复杂性)
  • 避免计算多个时间环内productarrange
  • 减少隐式临时数组的数量分配(和数组Numpy调用)
  • 不计算diffs的下三角形部分,因为它总是对称的
    (复制上三角形部分)
  • 使用基于整数的索引而不是缓慢缓慢的浮点索引

结果代码如下:

import numpy as np
from itertools import product
from numpy.lib.stride_tricks import sliding_window_view
import numba as nb
@nb.njit
def custom_func_fast(arr):
h, w = arr.shape[0], arr.shape[1]
diffs = np.zeros((h, h), dtype=arr.dtype)
max_k = 6
for n in range(1, max_k):
arr1 = np.empty(shape=(h, w-n+1), dtype=arr.dtype)
for i in range(h):
# Efficient sliding window algorithm
assert w >= n
s = np.sum(arr[i, 0:n])
arr1[i, 0] = s
for j in range(n, w):
s -= arr[i, j-n]
s += arr[i, j]
arr1[i, j-n+1] = s
# Efficient distance matrix computation
for i in range(h):
for j in range(i+1, h):
s = 0
for k in range(w-n+1):
s += np.abs(arr1[i,k] - arr1[j,k])
diffs[i, j] += s * n
# Fill the lower triangular part
for i in range(h):
for j in range(i):
diffs[i, j] = diffs[j, i]
return diffs

结果代码快了290倍对于我的机器上的示例输入数组。

您可以从移除第一个列表推导式开始:

arr1 = sliding_window_view(arr, window_shape = n, axis = 1).sum(axis=2)

我不打算碰那行长长的diffs:(

最新更新