在nd阵列的一个轴上应用1D函数



我想要什么:

我想对任意形状的ndarray应用1D函数,这样它就可以修改某个轴。类似于numpy.fft.fft中的axis参数。

举以下例子:

import numpy as np

def transf1d(f, x, y, out):
    """Transform `f(x)` to `g(y)`.
    This function is actually a C-function that is far more complicated
    and should not be modified. It only takes 1D arrays as parameters.    
    """
    out[...] = (f[None,:]*np.exp(-1j*x[None,:]*y[:,None])).sum(-1)

def transf_all(F, x, y, axis=-1, out=None):
    """General N-D transform.
    Perform `transf1d` along the given `axis`.
    Given the following:
      F.shape == (2, 3, 100, 4, 5)
      x.shape == (100,)
      y.shape == (50,)
      axis == 2
    Then the output shape would be:
      out.shape == (2, 3, 50, 4, 5)
    This function should wrap `transf1d` such that it works on arbitrarily
    shaped (compatible) arrays `F`, and `out`.
    """
    if out is None:
        shape = list(np.shape(F))
        shape[axis] = np.size(y)
    for f, o in magic_iterator(F, out):
        # Given above shapes:
        #   f.shape == (100,)
        #   o.shape == (50,)
        transf1d(f, x, y, o)
    return out

函数transf1d采用1D ndarray f,以及另外两个1D阵列xy。它对f(x)进行从x轴到y轴的傅立叶变换。结果存储在out参数中。

现在,我想将其封装在一个更通用的函数transf_all中,该函数可以采用任意形状的ndarrays和一个axis参数,该参数指定沿哪个轴进行变换。

注意事项:

  • 我的代码实际上是用Cython编写的。理想情况下,magic_iterator在Cython中会很快
  • 函数transf1d实际上是一个C函数,它在out自变量中返回其输出。因此,我无法使用numpy.apply_along_axis
  • 因为transf1d实际上是一个非常复杂的C函数,所以我不能重写它来处理任意数组。我需要将它封装在一个Cython函数中,该函数处理额外的维度
  • 注意,阵列xy的长度可以不同

我的问题:

我该怎么做?我如何在ndarray的任意维度上迭代,以便在每次迭代时都得到一个包含指定axis的1D数组?

我看了nditer,但我不确定它是否真的适合这份工作。

干杯!

import numpy as np

def transf1d(f, x, y, out):
    """Transform `f(x)` to `g(y)`.
    This function is actually a C-function that is far more complicated
    and should not be modified. It only takes 1D arrays as parameters.
    """
    out[...] = (f[None,:]*np.exp(-1j*x[None,:]*y[:,None])).sum(-1)

def transf_all(F, x, y, axis=-1, out=None):
    """General N-D transform.
    Perform `transf1d` along the given `axis`.
    Given the following:
      F.shape == (2, 3, 100, 4, 5)
      x.shape == (100,)
      y.shape == (50,)
      axis == 2
    Then the output shape would be:
      out.shape == (2, 3, 50, 4, 5)
    This function should wrap `transf1d` such that it works on arbitrarily
    shaped (compatible) arrays `F`, and `out`.
    """
    def wrapper(f):
        """
        wrap transf1d for apply_along_axis compatibility
        that is, having a signature of F.shape[axis] -> out.shape[axis]
        """
        out = np.empty_like(y)
        transf1d(f, x, y, out)
        return out
    return np.apply_along_axis(wrapper, axis, F)

尽管我还没有测试过,但我相信这应该能满足你的需求。请注意,在apply_along_axis内部发生的循环具有python级别的性能,所以这只是根据风格而不是性能来矢量化操作。然而,这可能并不重要,假设内部循环使用外部C代码的决定是合理的,因为它首先是一个不平凡的操作。

要回答您的问题:

如果你真的只想在上迭代,除了一个给定的轴,你可以使用:

for s in itertools.product(map(range, arr.shape[:axis]+arr.shape[axis+1:]):
    arr[s[:axis] + (slice(None),) + s[axis:]]

也许还有一种更优雅的方法,但这应该有效。

但是,不要重复:

对于您的问题,我只想重写您的函数,使其在ndarray的给定轴上工作。我认为这应该有效:

def transfnd(f, x, y, axis, out):
    s = list(f.shape)
    s.insert(axis, 1)
    yx = [y.size, x.size] + [1]*(f.ndim - axis - 1)
    out[...] = np.sum(f.reshape(*s)*np.exp(-1j*x[None,:]*y[:,None]).reshape(*yx), axis+1)

这实际上只是您当前实现的概括,但不是一开始就在F中插入一个新的轴,它将它插入到axis(可能有比list(shape)方法更好的方法来做到这一点,但这就是我所能做的。最后,你必须向你的yx外积添加尾随新轴,以匹配你在F中的尾随索引。

我真的不知道如何测试,但形状都可以,所以请测试一下,让我知道它是否有效。

我发现了一种使用Numpy C-API在Cython中迭代除一个轴之外的所有轴的方法(下面的代码)。然而,它并不漂亮。是否值得付出努力取决于内部功能和数据的大小。

如果有人知道Cython中更优雅的方法,请告诉我。

我将其与Eelco的解决方案进行了比较,它们在大型争论中的运行速度相当。对于较小的参数,C-API解决方案更快:

In [5]: y=linspace(-1,1,100);
In [6]: %timeit transf.apply_along(f, x, y, axis=1)
1 loops, best of 3: 5.28 s per loop
In [7]: %timeit transf.transfnd(f, x, y, axis=1)
1 loops, best of 3: 5.16 s per loop

正如您所看到的,对于这个输入,两个函数的速度大致相同。

In [8]: f=np.random.rand(10,20,50);x=linspace(0,1,20);y=linspace(-1,1,10);
In [9]: %timeit transf.apply_along(f, x, y, axis=1)
100 loops, best of 3: 15.1 ms per loop
In [10]: %timeit transf.transfnd(f, x, y, axis=1)
100 loops, best of 3: 8.55 ms per loop

然而,对于较小的输入数组,C-API方法会更快。

代码

#cython: boundscheck=False
#cython: wraparound=False
#cython: cdivision=True
import numpy as np
cimport numpy as np
np.import_array()
cdef extern from "complex.h":
    double complex cexp(double complex z) nogil
cdef void transf1d(double complex[:] f,
                   double[:] x,
                   double[:] y,
                   double complex[:] out,
                   int Nx,
                   int Ny) nogil:
    cdef int i, j
    for i in xrange(Ny):
        out[i] = 0
        for j in xrange(Nx):
            out[i] = out[i] + f[j]*cexp(-1j*x[j]*y[i])

def transfnd(F, x, y, axis=-1, out=None):
    # Make sure everything is a numpy array.
    F = np.asanyarray(F, dtype=complex)
    x = np.asanyarray(x, dtype=float)
    y = np.asanyarray(y, dtype=float)
    # Calculate absolute axis.
    cdef int ax = axis
    if ax < 0:
        ax = np.ndim(F) + ax
    # Calculate lengths of the axes `x`, and `y`.
    cdef int Nx = np.size(x), Ny = np.size(y)
    # Output array.
    if out is None:
        shape = list(np.shape(F))
        shape[axis] = Ny
        out = np.empty(shape, dtype=complex)
    else:
        out = np.asanyarray(out, dtype=complex)
    # Error check.
    assert np.shape(F)[axis] == Nx, 
            'Array length mismatch between `F`, and `x`!'
    assert np.shape(out)[axis] == Ny, 
            'Array length mismatch between `out`, and `y`!'
    f_shape = list(np.shape(F))
    o_shape = list(np.shape(out))
    f_shape[axis] = 0
    o_shape[axis] = 0
    assert f_shape == o_shape, 'Array shape mismatch between `F`, and `out`!'
    # Construct iterator over all but one axis.
    cdef np.flatiter itf = np.PyArray_IterAllButAxis(F, &ax)
    cdef np.flatiter ito = np.PyArray_IterAllButAxis(out, &ax)
    cdef int f_stride = F.strides[axis]
    cdef int o_stride = out.strides[axis]
    # Memoryview to access one slice per iteration.
    cdef double complex[:] fdat
    cdef double complex[:] odat
    cdef double[:] xdat = x
    cdef double[:] ydat = y
    while np.PyArray_ITER_NOTDONE(itf):
        # View the current `x`, and `y` axes.
        fdat = <double complex[:Nx]> np.PyArray_ITER_DATA(itf)
        fdat.strides[0] = f_stride
        odat = <double complex[:Ny]> np.PyArray_ITER_DATA(ito)
        odat.strides[0] = o_stride
        # Perform the 1D-transformation on one slice.
        transf1d(fdat, xdat, ydat, odat, Nx, Ny)
        # Go to next step.
        np.PyArray_ITER_NEXT(itf)
        np.PyArray_ITER_NEXT(ito)
    return out

# For comparison
def apply_along(F, x, y, axis=-1):
    # Make sure everything is a numpy array.
    F = np.asanyarray(F, dtype=complex)
    x = np.asanyarray(x, dtype=float)
    y = np.asanyarray(y, dtype=float)
    # Calculate absolute axis.
    cdef int ax = axis
    if ax < 0:
        ax = np.ndim(F) + ax
    # Calculate lengths of the axes `x`, and `y`.
    cdef int Nx = np.size(x), Ny = np.size(y)
    # Error check.
    assert np.shape(F)[axis] == Nx, 
            'Array length mismatch between `F`, and `x`!'
    def wrapper(f):
        out = np.empty(Ny, complex)
        transf1d(f, x, y, out, Nx, Ny)
        return out
    return np.apply_along_axis(wrapper, axis, F)

使用以下setup.py 构建

from distutils.core import setup
from Cython.Build import cythonize
import numpy as np
setup(
    name = 'transf',
    ext_modules = cythonize('transf.pyx'),
    include_dirs = [np.get_include()],
)

相关内容

最新更新