计算两个不同长度的2d numpy数组之间每行的元素匹配



我已经得到了下面这段正常工作的代码:

import numpy as np
arr1 = np.array([(1, 2, 3, 4), (2, 3, 4, 6), (4, 6, 7, 9), (4, 7, 8, 9), (5, 7, 8, 9)])
arr2 = np.array([(1, 3, 4, 5), (2, 3, 4, 5), (3, 4, 6, 7)])
m = []
for c in range(len(arr1)):
s = np.sum(np.any(arr1[c] == arr2[..., None], axis=1), axis=1)
m = np.append(m, np.amax(s, axis=0))

上面的结果如下所示(对于arr1中的每一行,找到与arr2中的任何行匹配的最大数量,并将该数字添加到输出数组

):
print(m)

[3。3.3.2. 1)

我面临的问题是,当arr1增长到数百万行时,即使arr2只有几千行,计算仍然变得非常慢。

我试着用下面的代码进行比较(和许多其他的尝试产生了更糟糕的结果),但这不起作用:

c = (arr1[:, None, ...] == arr2[None, ...])

上面的代码产生的结果非常接近我在进行计数之前期望看到的第1阶段输出(如下所示),但是元素的比较没有正确完成:

print(c)

[[[真假假假假][假假假假][假假假假][假假假假假]]

[[假真真假][真真真假][假False False False]]

[[假假假假][假假假假][假真真假]]

[[假假假假][假假假假][假False False False]]

[[假假假假][假假假假][假False False False []]

关于如何优化代码的第一个片段(这样它就不会使用for循环)与arr1有+/- 2 000 000行,arr2有+/- 7 000行工作的任何建议?

我直接跳到计时:

np.random.seed(0xFFFF)
arr1 = np.random.randint(10, size=(20000, 5))
arr2 = np.random.randint(10, size=(70, 5))
def original(a1, a2):
m = []
for c in range(len(a1)):
s = np.sum(np.any(a1[c] == a2[..., None], axis=1), axis=1)
m = np.append(m, np.amax(s, axis=0))
return m

我将输入大小从你的目标中截断了100倍,以便能够在我的弱笔记本电脑上得到合理的数字。你可以根据需要进行推断。

%timeit original(arr1, arr2)
1.11 s ± 19.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

与@mozway使用sets的解决方案进行比较:

def using_sets(a1, a2):
l1 = [frozenset(i) for i in a1]
l2 = [frozenset(i) for i in a2]
return np.array([max(len(s1.intersection(s2)) for s2 in l2) for s1 in l1])
534 ms ± 6.23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

首先要做的是:numpy数组分配不是平摊的,不像list。这意味着你不应该在循环中使用np.append(从列表开始没有帮助):

def list_append(a1, a2):
m = []
for c in range(len(a1)):
s = np.sum(np.any(a1[c] == a2[..., None], axis=1), axis=1)
m.append(np.amax(s, axis=0))
return np.array(m)

对于少量的元素来说,这是一个巨大的变化。在实践中,随着输出大小的增加,好处是将O(n^2)减少到O(n):

%timeit list_append(arr1, arr2)
856 ms ± 28.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

但等等!一开始你就知道输出的大小。最好是预先分配整个东西,并将结果存储到正确的索引:

def preallocate(a1, a2):
m = np.empty(a1.shape[0], int)
for c in range(len(a1)):
s = np.sum(np.any(a1[c] == a2[..., None], axis=1), axis=1)
m[c] = np.amax(s, axis=0)
return m

这次的增益要小得多,因为示例中的数组并不大,而且list已经平摊了很多附加值:

%timeit preallocate(arr1, arr2)
842 ms ± 4.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

现在来解决问题的核心。首先,您可以通过使用np.isin而不是计算巨大的布尔掩码来减少内存占用和速度:

def using_isin(a1, a2):
m = np.empty(a1.shape[0], int)
for c in range(len(a1)):
s = np.sum(np.isin(a2, a1[c]), axis=1)
m[c] = np.amax(s, axis=0)
return m

这开始接近基于集合的方法:

%timeit using_isin(arr1, arr2)
582 ms ± 1.55 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

如果数组有已排序的行,可以完全避免线性搜索:

def using_sorted(a1, a2):
a1 = np.sort(a1, axis=1)
m = np.empty(a1.shape[0], int)
for c in range(len(a1)):
m[c] = (a1[c, np.clip(np.searchsorted(a1[c], a2), 0, a2.shape[1] - 1)] == a2).sum(1).max()
return m
np.isin中实现的线性搜索相比,二进制搜索的开销实际上使这种情况更糟:
%timeit using_sorted(arr1, arr2)
695 ms ± 2.59 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

当然最快的解决方法是使用numba:

@njit
def using_numba(a1, a2):
m = np.empty(a1.shape[0], np.int32)
s = np.empty(a2.shape[0], a1.dtype)
for i in range(a1.shape[0]):
for j in range(a2.shape[0]):
s[j] = 0
for k in range(a1.shape[1]):
s[j] += a1[i, k] in a2[j]
m[i] = s.max()
return m

毫不奇怪,这比基于集合的方法快7倍:

%timeit using_numba(arr1, arr2)
73 ms ± 1.88 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

作为题外话,让我们看看你试图用布尔掩码做什么。对于arr1的每个元素,您要检查它是否等于arr2(形状:(2M, 5, 7k, 5))的任何元素。如果来自arr1的元素出现在arr2行的某个地方,则对其进行计数,因此将any应用于与arr2的行(形状:(2M, 5, 7k))相对应的维度。然后,您想知道arr1的给定行中匹配的元素总数,因此对该维度(形状:(2M, 7k))求和。最后,您想要的是arr2(shape:(2M))的行之和的最大值。

(arr1[..., None, None] == arr2).any(axis=3).sum(axis=1).max(axis=1)

注意在此过程中需要创建的临时数组的大小。其中至少有两个必须同时存在,因此该方法不太可能用于所需的输入。

你可以尝试使用python集:

l1 = [frozenset(i) for i in arr1]
l2 = [frozenset(i) for i in arr2]
[max(len(s1.intersection(s2)) for s2 in l2) for s1 in l1]

输出:

[3, 3, 3, 2, 1]

但是,老实说,这里你必须进行2M*7K的比较,所以无论如何都要花费时间。

最新更新