我有一个2D Numpy整数数组,如下所示:
a = np.array([[ 3, 0, 2, -1],
[ 1, 255, 1, 2],
[ 0, 3, 2, 2]])
我有一个包含整数键和值的字典,我想用它来用新值替换CCD_ 1的值。dict可能看起来像这样:
d = {0: 1, 1: 2, 2: 3, 3: 4, -1: 0, 255: 0}
我想用d
中的相应值替换与d
中的键匹配的a
的值。换句话说,d
定义了a
中旧(当前)和新(期望)值之间的映射。上面玩具示例的结果是:
a_new = np.array([[ 4, 1, 3, 0],
[ 2, 0, 2, 3],
[ 1, 4, 3, 3]])
实现这一点的有效方法是什么?
这是一个玩具般的例子,但在实践中,数组会很大,它的形状会是例如(1024, 2048)
,字典会有几十个元素(在我的例子中是34个),虽然键是整数,但它们不一定都是连续的,它们可以是负的(就像上面的例子一样)。
我需要在数十万个这样的阵列上执行这种替换,所以它需要快速。然而,字典是预先知道的,并且保持不变,因此渐近地,用于修改字典或将其转换为更合适的数据结构的任何时间都无关紧要。
我目前正在两个嵌套的for
循环中循环数组条目(在a
的行和列上),但必须有更好的方法。
如果映射不包含负值(例如,像示例中的-1),我只会从字典中创建一个列表或数组,其中键是数组索引,然后将其用于高效的Numpy花式索引例程。但由于也存在负值,这是行不通的。
这里有一种方法,如果你有一个小字典/min和max值,这可能会更有效,你可以通过添加数组min:来解决负索引
In [11]: indexer = np.array([d.get(i, -1) for i in range(a.min(), a.max() + 1)])
In [12]: indexer[(a - a.min())]
Out[12]:
array([[4, 1, 3, 0],
[2, 0, 2, 3],
[1, 4, 3, 3]])
注意:这会将for循环移动到查找表,但如果它明显小于实际数组,则速度可能会快得多
制作数组的副本,然后迭代字典项,然后使用布尔索引将新值分配给副本。
import numpy as np
b = np.copy(a)
for old, new in d.items():
b[a == old] = new
这篇文章解决了数组和字典键之间的一对一映射情况。这个想法与a
0中提出的类似,但我们将创建一个包含Python's negative indexing
的更大数组,从而使我们能够在不需要任何偏移的情况下简单地索引输入数组,这应该是显著的改进。
要获得索引器,由于字典保持不变,这将是一次性使用,请使用以下-
def getval_array(d):
v = np.array(list(d.values()))
k = np.array(list(d.keys()))
maxv = k.max()
minv = k.min()
n = maxv - minv + 1
val = np.empty(n,dtype=v.dtype)
val[k] = v
return val
val_arr = getval_array(d)
要获得最终替换项,只需编制索引即可。因此,对于输入数组a
,执行-
out = val_arr[a]
样品运行-
In [8]: a = np.array([[ 3, 0, 2, -1],
...: [ 1, 255, 1, -16],
...: [ 0, 3, 2, 2]])
...:
...: d = {0: 1, 1: 2, 2: 3, 3: 4, -1: 0, 255: 0, -16:5}
...:
In [9]: val_arr = getval_array(d) # one-time operation
In [10]: val_arr[a]
Out[10]:
array([[4, 1, 3, 0],
[2, 0, 2, 5],
[1, 4, 3, 3]])
平铺样本数据的运行时测试-
In [141]: a = np.array([[ 3, 0, 2, -1],
...: [ 1, 255, 1, -16],
...: [ 0, 3, 2, 2]])
...:
...: d = {0: 1, 1: 2, 2: 3, 3: 4, -1: 10, 255: 89, -16:5}
...:
In [142]: a = np.random.choice(a.ravel(), 1024*2048).reshape(1024,2048)
# @Andy Hayden's soln
In [143]: indexer = np.array([d.get(i, -1) for i in range(a.min(), a.max() + 1)])
In [144]: %timeit indexer[(a - a.min())]
100 loops, best of 3: 8.34 ms per loop
# Proposed in this post
In [145]: val_arr = getval_array(d)
In [146]: %timeit val_arr[a]
100 loops, best of 3: 2.69 ms per loop
Numpy可以创建矢量化函数,用于对数组执行映射操作。我不确定这里的哪种方法会有最好的性能,所以我用timeit来计时我的方法。如果你想找出性能最好的方法,我建议你尝试其他几种方法。
# Function to be vectorized
def map_func(val, dictionary):
return dictionary[val] if val in dictionary else val
# Vectorize map_func
vfunc = np.vectorize(map_func)
# Run
print(vfunc(a, d))
你可以通过以下操作来计时:
from timeit import Timer
t = Timer('vfunc(a, d)', 'from __main__ import a, d, vfunc')
print(t.timeit(number=1000))
我对这种方法的结果大约是0.014秒。
编辑:为了好玩,我用你的字典在(1024, 2048)
大小的从-10到10的随机数的numpy数组上试用了这个。单个阵列大约需要四分之一秒。除非您运行大量这样的阵列,否则如果性能达到可接受的水平,则可能不值得进行优化。
另一个选项,尚未进行基准测试:
def replace_values(src: np.ndarray, new_by_old: Dict[int,int]) -> np.ndarray:
dst = np.empty_like(src)
for x in np.unique(src):
dst[src==x] = new_by_old[x]
return dst
这与https://stackoverflow.com/a/46868897/2135504,但由于的原因应该会快一点
- 使用np.empty_like()而不是np.copy()
- 使用np.unique(src)而不是new_by_old.keys()