Numpy阵列比较和索引



我有2个不等的数组:

>>> np.size(array1)
4004001
>>> np.size(array2)
1000 

现在,需要将array2中的每个元素与array1中的所有元素进行比较,以找到与array2中该元素最接近的元素。找到此值后,我需要将其存储在1000尺寸的不同数组中 - 大小与Array2相对应的大小。

乏味而粗糙的做法可能是使用一个用于循环并从数组2中获取每个元素,从数组1元素中减去其绝对值,然后取最小值 - 这将使我的代码真的很慢。

我想使用numpy vectorized操作来做到这一点,但我有点撞墙。

要充分使用numpy并行性,我们需要矢量化函数。此外,使用相同的标准(最近(在同一数组(array1(中找到所有值。因此,可以专门在array1中搜索一个特殊的函数。

但是,为了使解决方案更加重复使用,更好地做一个更通用的解决方案,然后将其转换为更特定的解决方案。因此,作为查找最接近值的一般方法,我们从该查找最近的解决方案开始。然后,我们将其转换为更具体的矢量化,以使其立即在多个元素上工作:

import math
import numpy as np
from functools import partial
def find_nearest_sorted(array,value):
    idx = np.searchsorted(array, value, side="left")
    if idx > 0 and (idx == len(array) or math.fabs(value - array[idx-1]) < math.fabs(value - array[idx])):
        return array[idx-1]
    else:
        return array[idx]
array1 = np.random.rand(4004001)
array2 = np.random.rand(1000)
array1_sorted = np.sort(array1)
# Partially apply array1 to find function, to turn the general function
# into a specific, working with array1 only.
find_nearest_in_array1 = partial(find_nearest_sorted, array1_sorted)
# Vectorize specific function to allow us to apply it to all elements of
# array2, the numpy way.
vectorized_find = np.vectorize(find_nearest_in_array1)
output = vectorized_find(array2)

希望这是您想要的,一个新的向量,将array2中的数据映射到array1中的最近值。

最" numpythonic"的方法是使用广播。这是一种计算距离矩阵的快速简便的方法,然后您可以将其拿到绝对值的argmin

array1 = np.random.rand(4004001)
array2 = np.random.rand(1000)
# Calculate distance matrix (on truncated array1 for memory reasons)
dmat = array1[:400400] - array2[:,None]
# Take the abs of the distance matrix and work out the argmin along the  last axis
ix = np.abs(dmat).argmin(axis=1)

dmat的形状:

(1000, 400400)

ix的形状和内容:

(1000,)    
array([237473, 166831,  72369,  11663,  22998,  85179, 231702, 322752, ...])

但是,如果您一次执行此操作,则饿了,并且实际上在我指定的数组大小的8GB机器上不起作用,这就是为什么我减少了array1的大小。

为了使其在内存约束中工作,只需将其中一个阵列切成块,然后依次在每个块(或并行(上应用广播。在这种情况下,我将array2切成10个块:

# Define number of chunks and calculate chunk size
n_chunks = 10
chunk_len = array2.size // n_chunks
# Preallocate output array
out = np.zeros(1000)
for i in range(n_chunks):
    s = slice(i*chunk_len, (i+1)*chunk_len)
    out[s] = np.abs(array1 - array2[s, None]).argmin(axis=1)
import numpy as np
a = np.random.random(size=4004001).astype(np.float16)
b = np.random.random(size=1000).astype(np.float16)
#use numpy broadcasting to compare pairwise difference and then find the min arg in a for each element in b. Finally extract elements from a using the argmin array as indexes. 
output = a[np.argmin(np.abs(b[:,None] -a),axis=1)]

此解决方案虽然简单的记忆力很大。如果在大数组上使用它,则可能需要进一步优化。

最新更新