我想要什么:
我想对任意形状的ndarray应用1D函数,这样它就可以修改某个轴。类似于numpy.fft.fft
中的axis
参数。
举以下例子:
import numpy as np
def transf1d(f, x, y, out):
"""Transform `f(x)` to `g(y)`.
This function is actually a C-function that is far more complicated
and should not be modified. It only takes 1D arrays as parameters.
"""
out[...] = (f[None,:]*np.exp(-1j*x[None,:]*y[:,None])).sum(-1)
def transf_all(F, x, y, axis=-1, out=None):
"""General N-D transform.
Perform `transf1d` along the given `axis`.
Given the following:
F.shape == (2, 3, 100, 4, 5)
x.shape == (100,)
y.shape == (50,)
axis == 2
Then the output shape would be:
out.shape == (2, 3, 50, 4, 5)
This function should wrap `transf1d` such that it works on arbitrarily
shaped (compatible) arrays `F`, and `out`.
"""
if out is None:
shape = list(np.shape(F))
shape[axis] = np.size(y)
for f, o in magic_iterator(F, out):
# Given above shapes:
# f.shape == (100,)
# o.shape == (50,)
transf1d(f, x, y, o)
return out
函数transf1d
采用1D ndarray f
,以及另外两个1D阵列x
和y
。它对f(x)
进行从x
轴到y
轴的傅立叶变换。结果存储在out
参数中。
现在,我想将其封装在一个更通用的函数transf_all
中,该函数可以采用任意形状的ndarrays和一个axis
参数,该参数指定沿哪个轴进行变换。
注意事项:
- 我的代码实际上是用Cython编写的。理想情况下,
magic_iterator
在Cython中会很快 - 函数
transf1d
实际上是一个C函数,它在out
自变量中返回其输出。因此,我无法使用numpy.apply_along_axis
- 因为
transf1d
实际上是一个非常复杂的C函数,所以我不能重写它来处理任意数组。我需要将它封装在一个Cython函数中,该函数处理额外的维度 - 注意,阵列
x
和y
的长度可以不同
我的问题:
我该怎么做?我如何在ndarray的任意维度上迭代,以便在每次迭代时都得到一个包含指定axis
的1D数组?
我看了nditer
,但我不确定它是否真的适合这份工作。
干杯!
import numpy as np
def transf1d(f, x, y, out):
"""Transform `f(x)` to `g(y)`.
This function is actually a C-function that is far more complicated
and should not be modified. It only takes 1D arrays as parameters.
"""
out[...] = (f[None,:]*np.exp(-1j*x[None,:]*y[:,None])).sum(-1)
def transf_all(F, x, y, axis=-1, out=None):
"""General N-D transform.
Perform `transf1d` along the given `axis`.
Given the following:
F.shape == (2, 3, 100, 4, 5)
x.shape == (100,)
y.shape == (50,)
axis == 2
Then the output shape would be:
out.shape == (2, 3, 50, 4, 5)
This function should wrap `transf1d` such that it works on arbitrarily
shaped (compatible) arrays `F`, and `out`.
"""
def wrapper(f):
"""
wrap transf1d for apply_along_axis compatibility
that is, having a signature of F.shape[axis] -> out.shape[axis]
"""
out = np.empty_like(y)
transf1d(f, x, y, out)
return out
return np.apply_along_axis(wrapper, axis, F)
尽管我还没有测试过,但我相信这应该能满足你的需求。请注意,在apply_along_axis内部发生的循环具有python级别的性能,所以这只是根据风格而不是性能来矢量化操作。然而,这可能并不重要,假设内部循环使用外部C代码的决定是合理的,因为它首先是一个不平凡的操作。
要回答您的问题:
如果你真的只想在上迭代,除了一个给定的轴,你可以使用:
for s in itertools.product(map(range, arr.shape[:axis]+arr.shape[axis+1:]):
arr[s[:axis] + (slice(None),) + s[axis:]]
也许还有一种更优雅的方法,但这应该有效。
但是,不要重复:
对于您的问题,我只想重写您的函数,使其在ndarray
的给定轴上工作。我认为这应该有效:
def transfnd(f, x, y, axis, out):
s = list(f.shape)
s.insert(axis, 1)
yx = [y.size, x.size] + [1]*(f.ndim - axis - 1)
out[...] = np.sum(f.reshape(*s)*np.exp(-1j*x[None,:]*y[:,None]).reshape(*yx), axis+1)
这实际上只是您当前实现的概括,但不是一开始就在F
中插入一个新的轴,它将它插入到axis
(可能有比list(shape)
方法更好的方法来做到这一点,但这就是我所能做的。最后,你必须向你的yx
外积添加尾随新轴,以匹配你在F
中的尾随索引。
我真的不知道如何测试,但形状都可以,所以请测试一下,让我知道它是否有效。
我发现了一种使用Numpy C-API在Cython中迭代除一个轴之外的所有轴的方法(下面的代码)。然而,它并不漂亮。是否值得付出努力取决于内部功能和数据的大小。
如果有人知道Cython中更优雅的方法,请告诉我。
我将其与Eelco的解决方案进行了比较,它们在大型争论中的运行速度相当。对于较小的参数,C-API解决方案更快:
In [5]: y=linspace(-1,1,100);
In [6]: %timeit transf.apply_along(f, x, y, axis=1)
1 loops, best of 3: 5.28 s per loop
In [7]: %timeit transf.transfnd(f, x, y, axis=1)
1 loops, best of 3: 5.16 s per loop
正如您所看到的,对于这个输入,两个函数的速度大致相同。
In [8]: f=np.random.rand(10,20,50);x=linspace(0,1,20);y=linspace(-1,1,10);
In [9]: %timeit transf.apply_along(f, x, y, axis=1)
100 loops, best of 3: 15.1 ms per loop
In [10]: %timeit transf.transfnd(f, x, y, axis=1)
100 loops, best of 3: 8.55 ms per loop
然而,对于较小的输入数组,C-API方法会更快。
代码
#cython: boundscheck=False
#cython: wraparound=False
#cython: cdivision=True
import numpy as np
cimport numpy as np
np.import_array()
cdef extern from "complex.h":
double complex cexp(double complex z) nogil
cdef void transf1d(double complex[:] f,
double[:] x,
double[:] y,
double complex[:] out,
int Nx,
int Ny) nogil:
cdef int i, j
for i in xrange(Ny):
out[i] = 0
for j in xrange(Nx):
out[i] = out[i] + f[j]*cexp(-1j*x[j]*y[i])
def transfnd(F, x, y, axis=-1, out=None):
# Make sure everything is a numpy array.
F = np.asanyarray(F, dtype=complex)
x = np.asanyarray(x, dtype=float)
y = np.asanyarray(y, dtype=float)
# Calculate absolute axis.
cdef int ax = axis
if ax < 0:
ax = np.ndim(F) + ax
# Calculate lengths of the axes `x`, and `y`.
cdef int Nx = np.size(x), Ny = np.size(y)
# Output array.
if out is None:
shape = list(np.shape(F))
shape[axis] = Ny
out = np.empty(shape, dtype=complex)
else:
out = np.asanyarray(out, dtype=complex)
# Error check.
assert np.shape(F)[axis] == Nx,
'Array length mismatch between `F`, and `x`!'
assert np.shape(out)[axis] == Ny,
'Array length mismatch between `out`, and `y`!'
f_shape = list(np.shape(F))
o_shape = list(np.shape(out))
f_shape[axis] = 0
o_shape[axis] = 0
assert f_shape == o_shape, 'Array shape mismatch between `F`, and `out`!'
# Construct iterator over all but one axis.
cdef np.flatiter itf = np.PyArray_IterAllButAxis(F, &ax)
cdef np.flatiter ito = np.PyArray_IterAllButAxis(out, &ax)
cdef int f_stride = F.strides[axis]
cdef int o_stride = out.strides[axis]
# Memoryview to access one slice per iteration.
cdef double complex[:] fdat
cdef double complex[:] odat
cdef double[:] xdat = x
cdef double[:] ydat = y
while np.PyArray_ITER_NOTDONE(itf):
# View the current `x`, and `y` axes.
fdat = <double complex[:Nx]> np.PyArray_ITER_DATA(itf)
fdat.strides[0] = f_stride
odat = <double complex[:Ny]> np.PyArray_ITER_DATA(ito)
odat.strides[0] = o_stride
# Perform the 1D-transformation on one slice.
transf1d(fdat, xdat, ydat, odat, Nx, Ny)
# Go to next step.
np.PyArray_ITER_NEXT(itf)
np.PyArray_ITER_NEXT(ito)
return out
# For comparison
def apply_along(F, x, y, axis=-1):
# Make sure everything is a numpy array.
F = np.asanyarray(F, dtype=complex)
x = np.asanyarray(x, dtype=float)
y = np.asanyarray(y, dtype=float)
# Calculate absolute axis.
cdef int ax = axis
if ax < 0:
ax = np.ndim(F) + ax
# Calculate lengths of the axes `x`, and `y`.
cdef int Nx = np.size(x), Ny = np.size(y)
# Error check.
assert np.shape(F)[axis] == Nx,
'Array length mismatch between `F`, and `x`!'
def wrapper(f):
out = np.empty(Ny, complex)
transf1d(f, x, y, out, Nx, Ny)
return out
return np.apply_along_axis(wrapper, axis, F)
使用以下setup.py
构建
from distutils.core import setup
from Cython.Build import cythonize
import numpy as np
setup(
name = 'transf',
ext_modules = cythonize('transf.pyx'),
include_dirs = [np.get_include()],
)