我已经得到了下面这段正常工作的代码:
import numpy as np
arr1 = np.array([(1, 2, 3, 4), (2, 3, 4, 6), (4, 6, 7, 9), (4, 7, 8, 9), (5, 7, 8, 9)])
arr2 = np.array([(1, 3, 4, 5), (2, 3, 4, 5), (3, 4, 6, 7)])
m = []
for c in range(len(arr1)):
s = np.sum(np.any(arr1[c] == arr2[..., None], axis=1), axis=1)
m = np.append(m, np.amax(s, axis=0))
上面的结果如下所示(即对于arr1中的每一行,找到与arr2中的任何行匹配的最大数量,并将该数字添加到输出数组
):print(m)
[3。3.3.2. 1)
我面临的问题是,当arr1增长到数百万行时,即使arr2只有几千行,计算仍然变得非常慢。
我试着用下面的代码进行比较(和许多其他的尝试产生了更糟糕的结果),但这不起作用:
c = (arr1[:, None, ...] == arr2[None, ...])
上面的代码产生的结果非常接近我在进行计数之前期望看到的第1阶段输出(如下所示),但是元素的比较没有正确完成:
print(c)
[[[真假假假假][假假假假][假假假假][假假假假假]]
[[假真真假][真真真假][假False False False]]
[[假假假假][假假假假][假真真假]]
[[假假假假][假假假假][假False False False]]
[[假假假假][假假假假][假False False False []]
关于如何优化代码的第一个片段(这样它就不会使用for循环)与arr1有+/- 2 000 000行,arr2有+/- 7 000行工作的任何建议?
我直接跳到计时:
np.random.seed(0xFFFF)
arr1 = np.random.randint(10, size=(20000, 5))
arr2 = np.random.randint(10, size=(70, 5))
def original(a1, a2):
m = []
for c in range(len(a1)):
s = np.sum(np.any(a1[c] == a2[..., None], axis=1), axis=1)
m = np.append(m, np.amax(s, axis=0))
return m
我将输入大小从你的目标中截断了100倍,以便能够在我的弱笔记本电脑上得到合理的数字。你可以根据需要进行推断。
%timeit original(arr1, arr2)
1.11 s ± 19.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
与@mozway使用sets的解决方案进行比较:
def using_sets(a1, a2):
l1 = [frozenset(i) for i in a1]
l2 = [frozenset(i) for i in a2]
return np.array([max(len(s1.intersection(s2)) for s2 in l2) for s1 in l1])
534 ms ± 6.23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
首先要做的是:numpy数组分配不是平摊的,不像list
。这意味着你不应该在循环中使用np.append
(从列表开始没有帮助):
def list_append(a1, a2):
m = []
for c in range(len(a1)):
s = np.sum(np.any(a1[c] == a2[..., None], axis=1), axis=1)
m.append(np.amax(s, axis=0))
return np.array(m)
对于少量的元素来说,这是一个巨大的变化。在实践中,随着输出大小的增加,好处是将O(n^2)减少到O(n):
%timeit list_append(arr1, arr2)
856 ms ± 28.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
但等等!一开始你就知道输出的大小。最好是预先分配整个东西,并将结果存储到正确的索引:
def preallocate(a1, a2):
m = np.empty(a1.shape[0], int)
for c in range(len(a1)):
s = np.sum(np.any(a1[c] == a2[..., None], axis=1), axis=1)
m[c] = np.amax(s, axis=0)
return m
这次的增益要小得多,因为示例中的数组并不大,而且list
已经平摊了很多附加值:
%timeit preallocate(arr1, arr2)
842 ms ± 4.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
现在来解决问题的核心。首先,您可以通过使用np.isin
而不是计算巨大的布尔掩码来减少内存占用和速度:
def using_isin(a1, a2):
m = np.empty(a1.shape[0], int)
for c in range(len(a1)):
s = np.sum(np.isin(a2, a1[c]), axis=1)
m[c] = np.amax(s, axis=0)
return m
这开始接近基于集合的方法:
%timeit using_isin(arr1, arr2)
582 ms ± 1.55 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
如果数组有已排序的行,可以完全避免线性搜索:
def using_sorted(a1, a2):
a1 = np.sort(a1, axis=1)
m = np.empty(a1.shape[0], int)
for c in range(len(a1)):
m[c] = (a1[c, np.clip(np.searchsorted(a1[c], a2), 0, a2.shape[1] - 1)] == a2).sum(1).max()
return m
与np.isin
中实现的线性搜索相比,二进制搜索的开销实际上使这种情况更糟:
%timeit using_sorted(arr1, arr2)
695 ms ± 2.59 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
当然最快的解决方法是使用numba:
@njit
def using_numba(a1, a2):
m = np.empty(a1.shape[0], np.int32)
s = np.empty(a2.shape[0], a1.dtype)
for i in range(a1.shape[0]):
for j in range(a2.shape[0]):
s[j] = 0
for k in range(a1.shape[1]):
s[j] += a1[i, k] in a2[j]
m[i] = s.max()
return m
毫不奇怪,这比基于集合的方法快7倍:
%timeit using_numba(arr1, arr2)
73 ms ± 1.88 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
作为题外话,让我们看看你试图用布尔掩码做什么。对于arr1的每个元素,您要检查它是否等于arr2(形状:(2M, 5, 7k, 5)
)的任何元素。如果来自arr1
的元素出现在arr2
行的某个地方,则对其进行计数,因此将any
应用于与arr2
的行(形状:(2M, 5, 7k)
)相对应的维度。然后,您想知道arr1
的给定行中匹配的元素总数,因此对该维度(形状:(2M, 7k)
)求和。最后,您想要的是arr2
(shape:(2M)
)的行之和的最大值。
(arr1[..., None, None] == arr2).any(axis=3).sum(axis=1).max(axis=1)
注意在此过程中需要创建的临时数组的大小。其中至少有两个必须同时存在,因此该方法不太可能用于所需的输入。
你可以尝试使用python集:
l1 = [frozenset(i) for i in arr1]
l2 = [frozenset(i) for i in arr2]
[max(len(s1.intersection(s2)) for s2 in l2) for s1 in l1]
输出:
[3, 3, 3, 2, 1]
但是,老实说,这里你必须进行2M*7K的比较,所以无论如何都要花费时间。