>我有一个 3D 数组,需要迭代它,提取一个 2x2x2 体素大区域并检查是否有任何体素不为零。在这些位置中,我需要区域和索引的独特元素:
import time
import numpy as np
np.random.seed(1234)
def _naive_iterator(array):
lookup = np.pad(array, (1, 1), 'constant') # Border is filled with 0
nx, ny, nz = lookup.shape
for i in range(nx - 1):
for j in range(ny - 1):
for k in range(nz - 1):
n = lookup[i:i + 2, j:j + 2, k:k + 2]
if n.any(): # check if any value in the region is non-zero
yield n.ravel(), i, j, k
# yield set(n.ravel()), i, j, k # `set()` alone takes some time - for testing purposes exclude this
# arrays that shall be used here are in the region of (1000, 1000, 1000) or larger
# arrays are asserted to contain only integer values >= 0.
array = np.random.randint(0, 2, (200, 200, 200), dtype=np.uint8)
for fun in (_naive_iterator, ):
print(f"* {fun}")
for _ in range(2):
tic = time.time()
[x for x in fun(array)]
print(f" ** execution took {time.time() - tic}")
在我的电脑上,这个循环大约需要 24 秒才能运行。(有趣的旁注:如果没有 n.any(),循环只需要 8s,所以也许也有一些优化潜力?
我想过如何通过并行运行它来加快速度。但是,我无法弄清楚如何在不预先生成所有 2x2x2 数组的情况下做到这一点。 我也考虑过使用scipy.ndimage.generic_filter
但这样一来,我只能获得一个图像,例如在我想要包含的所有像素上都有 1,但我必须迭代原始图像才能获得n.ravel()
(理想情况下,人们会直接使用generic_filter
,但我无法在调用函数内获取索引)。
如何加速此循环,可能通过并行化迭代?
没有 n.any(),循环只需要 8 秒,所以也许也有一些优化潜力?
这是因为Numpy 函数对于非常小的数组(如 2x2x2)有很大的开销。Numpy函数的开销约为几微秒,而在主流处理器上,实际n.any()
计算不应超过十几纳秒。通常的解决方案是矢量化操作,以避免许多 Numpy 调用。您可以使用Numba来加速这段代码,并消除大部分 CPython/Numpy 开销。请注意,Numba 目前不支持像pad
这样的所有功能,因此需要解决方法。下面是结果代码:
import time
import numpy as np
import numba as nb
np.random.seed(1234)
@nb.njit('(uint8[:,:,::1],)')
def numba_iterator(lookup):
nx, ny, nz = lookup.shape
for i in range(nx - 1):
for j in range(ny - 1):
for k in range(nz - 1):
n = lookup[i:i + 2, j:j + 2, k:k + 2]
if n.any():
yield n.ravel(), i, j, k
array = np.random.randint(0, 2, (200, 200, 200), dtype=np.uint8)
for fun in (numba_iterator, ):
print(f"* {fun}")
for _ in range(2):
tic = time.time()
lookup = np.pad(array, (1, 1), 'constant') # Border is filled with 0
[x for x in fun(lookup)]
print(f" ** execution took {time.time() - tic}")
这在我的机器上要快很多倍(但仍然很慢)。
我想过如何通过并行运行它来加快速度。
只要使用yield
,这是不可能的,因为生成器本质上是顺序的。
如何加快此循环
一种解决方案可能是在 Numba 中将整个输出生成为 Numpy 数组,以避免创建存储在 CPython 列表中的800 万个 Numpy 对象,这是使用 Numba 优化后代码变慢的主要原因(每次调用n.ravel
都会创建一个新数组)。请注意,生成器通常很慢,因为它们通常需要上下文切换(一种轻量级线程/协程)。就性能而言,最佳解决方案是在循环中动态计算数据。
此外,n.any
和n.ravel
可以在 Numba 中手动重写,以提高效率。事实上,n
数组视图非常小,使用 3 个具有常量编译时间限制的嵌套循环有助于编译器生成快速代码(即它可以展开循环并仅生成处理器可以非常有效地执行的少量指令)。
下面是一个修改后的改进代码(手动计算填充的数组):
@nb.njit('(uint8[:,:,::1],)')
def fast_compute(array):
nx, ny, nz = array.shape
# Padding (with zeros)
lookup = np.zeros((nx+2, ny+2, nz+2), dtype=np.uint8)
for i in range(nx):
for j in range(ny):
for k in range(nz):
lookup[i+1, j+1, k+1] = array[i, j, k]
# Actual computation
size = (nx + 1) * (ny + 1) * (nz + 1)
result = np.empty((size, 8), dtype=np.uint8)
indices = np.empty((size, 3), dtype=np.uint32)
cur = 0
for i in range(nx + 1):
for j in range(ny + 1):
for k in range(nz + 1):
n = lookup[i:i+2, j:j+2, k:k+2]
# Fast manual n.any()
found = False
for i2 in range(2):
for j2 in range(2):
for k2 in range(2):
found |= n[i2, j2, k2]
if found:
# Fast manual n.ravel()
cur2 = 0
for i2 in range(2):
for j2 in range(2):
for k2 in range(2):
result[cur, cur2] = n[i2, j2, k2]
cur2 += 1
indices[cur, 0] = i
indices[cur, 1] = j
indices[cur, 2] = k
cur += 1
return result[:cur].reshape(cur,2,2,2), indices[:cur]
生成的代码相当大,但这是为高性能计算付出的代价。
正如@norok2所指出的,result[:cur]
和indices[:cur]
是引用数组的视图。与分配的数组相比,视图可能非常小。如果这是一个问题,您可以返回副本(例如。result[:cur].copy()
),以避免可能的内存过度消耗。在实践中,这应该不是问题,因为数组是在虚拟内存中分配的,并且只有写入的页面映射到主流系统上的物理内存中(例如。视窗和Linux)。虚拟内存的页面仅在第一次触摸期间(即首次写入项目时)映射到物理内存。现代平台可以分配大量的虚拟内存(例如,在我的主流x86-64 Windows上131072 GiB,在主流x86-64 Linux上甚至更多),而物理内存则更加稀缺(例如,我的机器上为16 GiB)。当没有视图不再引用它时,将释放基础数组。
基准
_naive_iterator: 21.25 s
numba_iterator: 8.10 s
get_windows_and_indices: 1.35 s
fast_compute: 0.13 s
最后一个 Numba 函数比初始函数快163倍,比 @flawr 的矢量化 Numpy 实现快 10 倍。
Numba 实现当然可以是多线程的,但这并不容易做到,因为线程需要写入输出和写入项目的位置(即。cur
) 依赖于其他线程。此外,这将使代码更加复杂。
每当您使用 numpy 时,都应该尽量避免显式循环。这些循环是用 python 编写的,因此通常比矢量化的任何内容都要慢。这样,您就可以将循环推迟到底层 C 函数,这些函数几乎与任何函数一样快。因此,我会用下面这样的事情来处理您的问题。此函数执行与_naive_iterator
大致相同的操作,但以矢量化的方式执行,没有任何python循环:
from numpy.lib.stride_tricks import sliding_window_view
def get_windows_and_indices(array):
lookup = np.pad(array, (1, 1), 'constant') # Border is filled with 0
nx, ny, nz = lookup.shape
x, y, z = np.mgrid[0:nx, 0:ny, 0:nz]
lookup = np.stack([lookup, x, y, z])
out = sliding_window_view(lookup, (2, 2, 2), axis=(1, 2, 3)).reshape(4, -1, 2, 2, 2)
windows = out[0, ...]
ic = out[1, ..., 0, 0, 0]
jc = out[2, ..., 0, 0, 0]
kc = out[3, ..., 0, 0, 0]
mask = windows.any(axis=(1, 2, 3))
return windows[mask], ic[mask], jc[mask], kc[mask]
当然,您还需要以不同的方式思考代码的其余部分,但是如果您想(有效地)使用numpy,矢量化确实是您需要习惯的事情。 另外,我很确定即使上面的这个功能也不是最佳的,绝对可以进一步改进。
在保留功能的同时加快代码速度的最简单方法是使用 Numba。我认为填充本质上是一个装饰步骤,我将在答案结束时单独处理它。
以下是最初提出的代码和朴素的 Numba 加速的更简洁的实现:
import numpy as np
import numba as nb
def i_cubicles_3D_set_OP(arr, size=2):
nx, ny, nz = arr.shape
nx += 1 - size
ny += 1 - size
nz += 1 - size
for i in range(nx):
for j in range(ny):
for k in range(nz):
window = arr[i:i + size, j:j + size, k:k + size]
if window.any():
yield set(window.ravel()), (i, j, k)
i_cubicles_3D_set_OP_nb = nb.njit(i_cubicles_3D_set_OP)
i_cubicles_3D_set_OP_nb.__name__ = "i_cubicles_3D_set_OP_nb"
如果有人对它的维度无关版本感兴趣(以牺牲一些速度为代价),可以写
:def i_cubicles_set_nb(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
return _i_cubicles_set_nb(view.reshape((-1, window_size)), reduced_shape)
@nb.njit
def unravel_index(x, shape):
result = np.zeros(len(shape), dtype=np.int_)
for i, dim in enumerate(shape[::-1], 1):
result[-i] = x % dim
x //= dim
return result
@nb.njit
def not_only_zeros(seq):
# assumes seq is not empty
count = 0
for x in seq:
if x == 0:
count += 1
break # because only unique values
return len(seq) != count
@nb.njit
def _i_cubicles_set_nb(arr, shape):
for i, x in enumerate(arr):
uniques = set(x)
if not_only_zeros(uniques):
yield uniques, unravel_index(i, shape)
这引入了生成输入的跨步(只读)视图的重要技巧,该视图可用于在概念上简化所有循环,但代价是必须手动解开索引。
这与@flawr回答中提出的想法类似。
在 50³ 大小的索引上,我得到以下时间:
np.random.seed(42)
n = 50
arr = np.random.randint(0, 3, (n, n, n), dtype=np.uint8)
def is_equal_i_set(a, b):
return all(x[0] == y[0] and np.allclose(x[1], y[1]) for x, y in zip(a, b))
funcs = i_cubicles_3d_set_OP_nb, i_cubicles_3d_set_OP, i_cubicles_set_nb
base = list(funcs[0](arr))
for func in funcs:
res = list(func(arr))
print(f"{func.__name__:>24} {is_equal_i_set(base, res)!s:>5}", end=' ')
# %timeit -n 1 -r 1 list(func(arr))
%timeit list(func(arr))
# i_cubicles_3d_set_OP_nb True 1 loop, best of 5: 130 ms per loop
# i_cubicles_3d_set_OP True 1 loop, best of 5: 776 ms per loop
# i_cubicles_set_nb True 10 loops, best of 5: 151 ms per loop
表明使用努姆巴非常有效。
<小时 />无唯一性
如果一个人愿意放弃只返回隔间内唯一元素的要求,用隔间内的所有元素替换它们,他确实获得了一些(但不多)速度:
@nb.njit
def i_cubicles_3d_nb(arr, size=2):
nx, ny, nz = arr.shape
nx += 1 - size
ny += 1 - size
nz += 1 - size
for i in range(nx):
for j in range(ny):
for k in range(nz):
window = arr[i:i + size, j:j + size, k:k + size]
if window.any():
yield window.ravel(), (i, j, k)
def i_cubicles_nb(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
return _i_cubicles_nb(view.reshape((-1, window_size)), reduced_shape)
@nb.njit
def unravel_index(x, shape):
result = np.zeros(len(shape), dtype=np.int_)
for i, dim in enumerate(shape[::-1], 1):
result[-i] = x % dim
x //= dim
return result
@nb.njit
def any_nb(arr):
for x in arr:
if x:
return True
return False
@nb.njit
def _i_cubicles_nb(arr, shape):
for i, x in enumerate(arr):
if any_nb(x):
yield x, unravel_index(i, shape)
如以下基准测试所示(在与之前相同的 50³ 大小的输入上):
def is_equal_i(a, b):
return all(np.allclose(x[0], y[0]) and np.allclose(x[1], y[1]) for x, y in zip(a, b))
funcs = i_cubicles_3d_nb, i_cubicles_nb
base = list(funcs[0](arr))
for func in funcs:
res = list(func(arr))
print(f"{func.__name__:>24} {is_equal_i(base, res)!s:>5}", end=' ')
# %timeit -n 1 -r 1 list(func(arr))
%timeit list(func(arr))
# print()
# i_cubicles_3d_nb True 10 loops, best of 5: 116 ms per loop
# i_cubicles_nb True 10 loops, best of 5: 125 ms per loop
<小时 />无产量(无唯一性)
虽然很明显,只有使用 Numba/Cython 才能使与 OP 输出完全匹配的函数更快,但通过放弃 OP 代码的某些功能可以获得许多快速方法。
特别是,在创建生成器时,需要花费大量时间来创建要生成的实际对象。 相同的信息可以一次返回(最重要的是分配),速度显著提高,特别是如果我们跳过创建用于计算唯一元素的容器。
一旦我们接受返回隔间内的所有元素而不是其唯一元素,就可以设计仅 NumPy 矢量化(快速且与维度无关)的方法,以及更快的 Numba(特定于 3d)实现:
def cubicles_np(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
mask = np.any(view, axis=tuple(range(-arr.ndim, 0)))
return view[mask, ...], np.array(np.nonzero(mask)).transpose()
def cubicles_tr_np(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=window + reduced_shape, strides=arr.strides * 2, writeable=False)
mask = np.any(view, axis=tuple(range(arr.ndim)))
return (
view[..., mask].reshape((window_size, -1)).transpose().reshape((-1, *window)),
np.array(np.nonzero(mask)).transpose())
def cubicles_nb(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
values, indexes = _cubicles_nb(view.reshape((-1, window_size)), reduced_shape, arr.ndim)
return values.reshape((-1, *window)), indexes
@nb.njit
def any_nb(arr):
for x in arr:
if x:
return True
return False
@nb.njit
def _cubicles_nb(arr, shape, ndim):
n, k = arr.shape
indexes = np.empty((n, ndim), dtype=np.bool_)
result = np.empty((n, k), dtype=arr.dtype)
count = 0
for i in range(n):
x = arr[i]
if any_nb(x):
indexes[count] = unravel_index(i, shape)
result[count] = x
count += 1
return result[:count].copy(), indexes[:count].copy()
@nb.njit
def any_cubicle_3d_nb(arr, size):
for i in range(size):
for j in range(size):
for k in range(size):
if arr[i, j, k]:
return True
return False
@nb.njit
def cubicles_3d_nb(arr, size=2):
nx, ny, nz = arr.shape
nx += 1 - size
ny += 1 - size
nz += 1 - size
nn = nx * ny * nz
indexes = np.empty((nn, 3), dtype=np.bool_)
result = np.empty((nn, size, size, size), dtype=arr.dtype)
count = 0
for i in range(nx):
for j in range(ny):
for k in range(nz):
x = arr[i:i + size, j:j + size, k:k + size]
if any_cubicle_3d_nb(x, size):
result[count] = x
indexes[count] = i, j, k
count += 1
return result[:count].copy(), indexes[:count].copy()
在 50³ 大小的输入上再次获得的时序确实表明,对于基于 Numba 的方法,拼写循环比循环视图快得多。 事实上,在没有明确循环维度的情况下,仅 NumPy 的方法可能比 Numba 加速方法更快。
请注意,cubicles_3d_nb()
本质上可以看作是@JérômeRichard答案的清理版本。 (实际上,JérômeRichard在我的机器和输入上fast_compute()
的时间 - 加上额外的.copy()
- 似乎表明cubicles_3d_nb()
更有效 - 可能是因为"any"代码短路,并且不需要手动破坏值)。
def is_equal(a, b):
return all(np.allclose(x[0], y[0]) and np.allclose(x[1], y[1]) for x, y in zip(a, b))
funcs = cubicles_3d_nb, cubicles_nb, cubicles_np, cubicles_tr_np
base = funcs[0](arr)
for func in funcs:
res = func(arr)
print(f"{func.__name__:>24} {is_equal(base, res)!s:>5}", end=' ')
%timeit func(arr)
# cubicles_3d_nb True 100 loops, best of 5: 3.82 ms per loop
# cubicles_nb True 10 loops, best of 5: 23 ms per loop
# cubicles_np True 10 loops, best of 5: 24.7 ms per loop
# cubicles_tr_np True 100 loops, best of 5: 16.5 ms per loop
>索引注释如果要一次给出所有结果,那么索引本身对于存储有关非零隔间位置的信息并不是特别有效,除非它们很少。 相反,布尔数组的内存效率更高。 索引需要index_size * ndim * num
(num
是非零隔间的数量,限定为0 < num < prod(shape)
)。 掩蔽需要bool_size * prod(shape)
。 对于 NumPybool_size = 8
而index_size = 64
(可以调整,但通常至少 16 个),所以:index_size = bool_size * k
. 因此,只要:
num < prod(shape) // (k * ndim)
对于3D和典型index_size = 64
,这意味着(num / prod(shape)) < (1 / 24)
,因此如果非零隔间为~5%或更小,索引是有效的。
在速度方面,只要非零隔间不是太少,使用布尔掩码而不是索引可能会导致实现速度更快,但幅度很小(~5 到 ~20%)。
附录:填充
虽然 Numba 不支持np.pad()
,但在 Numba 之外调用任何填充函数都非常简单。
此外,对于某些输入组合,np.pad()
比切片输出上的简单分配慢:
import numpy as np
import numba as nb
@nb.njit
def pad_3d_nb(arr, size=1):
nx, ny, nz = arr.shape
result = np.zeros((nx + 2 * size, ny + 2 * size, nz + 2 * size), dtype=arr.dtype)
for i in range(nx):
for j in range(ny):
for k in range(nz):
result[i + size, j + size, k + size] = arr[i, j, k]
return result
def const_pad(arr, size=1, value=0):
shape = tuple(dim + 2 * size for dim in arr.shape)
mask = tuple(slice(size, dim + size) for dim in arr.shape)
result = np.full(shape, value, dtype=arr.dtype)
result[mask] = arr
return result
np.random.seed(42)
n = 200
k = 10
arr = np.random.randint(0, 3, (n, n, n), dtype=np.uint8)
base = np.pad(arr, (k, k))
print(np.allclose(pad_3d_nb(arr, k), base))
# True
print(np.allclose(const_pad(arr, k), base))
# True
%timeit np.pad(arr, (k, k))
# 100 loops, best of 5: 3.01 ms per loop
%timeit pad_3d_nb(arr, k)
# 100 loops, best of 5: 11.5 ms per loop
%timeit const_pad(arr, k)
# 100 loops, best of 5: 2.53 ms per loop