是否可以使用 Numpy 实现此版本的矩阵乘法?



我希望快速评估下面的函数,它在高级别上类似于矩阵乘法。对于大型矩阵,下面的实现比矩阵的 numpy 乘法慢几个数量级,这让我相信有更好的方法来使用 numpy 实现这一点。有没有办法使用 numpy 函数而不是 for 循环来实现这一点?我正在使用的矩阵在每个维度上都有 10K-100K 元素的范围内,因此非常需要这种优化。

一种方法是使用 3D numpy 数组,但这被证明太大而无法存储。我还研究了np.vectorize,这似乎不合适。

非常感谢您的指导。

编辑:谢谢大家的精彩见解和答案。非常感谢您的投入。将日志移出循环可以极大地改善运行时,有趣的是,k查找非常重要。如果可以的话,我有一个后续问题:如果内部循环表达式变得C[i,j] += A[i,k] * np.log(A[i,k] + B[k,j]),您能看到一种加速的方法吗?日志可以像以前一样移出,但前提是A[i,k]是指数的,这很昂贵,并且消除了移出日志的收益。

import numpy as np
import numba
from numba import njit, prange
@numba.jit(fastmath=True, parallel=True)
def f(A, B):
    
    C = np.zeros((A.shape[0], B.shape[1]))
    for i in prange(A.shape[0]):
        for j in prange(B.shape[1]):
            for k in prange(A.shape[1]):
                
                C[i,j] += np.log(A[i,k] + B[k,j])
                #matrix mult. would be: C[i,j] += A[i,k] * B[k,j]
    return C
#A = np.random.rand(100000, 100000)
#B = np.random.rand(100000, 100000)
#f(A, B)

log(a) + log(b) == log(a * b)以来,您可以通过用乘法替换加法并仅在末尾进行对数来节省大量对数计算,这应该可以节省大量时间。

import numpy as np
import numba as nb
@nb.njit(fastmath=True, parallel=True)
def f(A, B):
C = np.ones((A.shape[0], B.shape[1]), A.dtype)
for i in nb.prange(A.shape[0]):
for j in nb.prange(B.shape[1]):
# Accumulate product
for k in nb.prange(A.shape[1]):
C[i,j] *= (A[i,k] + B[k,j])
# Apply logarithm at the end
return np.log(C)
# For comparison
@nb.njit(fastmath=True, parallel=True)
def f_orig(A, B):
C = np.zeros((A.shape[0], B.shape[1]), A.dtype)
for i in nb.prange(A.shape[0]):
for j in nb.prange(B.shape[1]):
for k in nb.prange(A.shape[1]):
C[i,j] += np.log(A[i,k] + B[k,j])
return C
# Test
np.random.seed(0)
a, b = np.random.random((1000, 100)), np.random.random((100, 2000))
print(np.allclose(f(a, b), f_orig(a, b)))
# True
%timeit f(a, b)
# 36.2 ms ± 2.91 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit f_orig(a, b)
# 296 ms ± 3.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

正如@jdehesa已经指出的,您可以使用以下简化:log(a) + log(b) == log(a * b)但请注意,结果可能会有很大差异。 此外,还有许多方法可以优化此功能。最佳解决方案取决于输入矩阵的大小和所需的数值稳定性。

使用标量并在转置阵列上工作(可实现自动 SIMD 矢量化(

import numpy as np
#from version 0.43 until 0.47 this has to be set before importing numba
#Bug: https://github.com/numba/numba/issues/4689
from llvmlite import binding
binding.set_option('SVML', '-vector-library=SVML')
import numba as nb
@nb.njit(fastmath=True,parallel=True)
def f_orig(A, B):
C = np.zeros((A.shape[0], B.shape[1]))
for i in nb.prange(A.shape[0]):
for j in range(B.shape[1]):
for k in range(A.shape[1]):
C[i,j] += np.log(A[i,k] + B[k,j])
#matrix mult. would be: C[i,j] += A[i,k] * B[k,j]
return C
@nb.njit(fastmath=True,parallel=True)
def f_pre_opt(A, B):
C = np.empty((A.shape[0], B.shape[1]))
B_T=np.ascontiguousarray(B.T)
for i in nb.prange(A.shape[0]):
for j in range(B_T.shape[0]):
acc=1.
for k in range(A.shape[1]):
acc*=(A[i,k] + B_T[j,k])
C[i,j] = np.log(acc)
return C
@nb.njit(fastmath=True, parallel=True)
def f_jdehesa(A, B):
C = np.ones((A.shape[0], B.shape[1]), A.dtype)
for i in nb.prange(A.shape[0]):
for j in nb.prange(B.shape[1]):
# Accumulate product
for k in nb.prange(A.shape[1]):
C[i,j] *= (A[i,k] + B[k,j])
# Apply logarithm at the end
return np.log(C)

计时

# Test
np.random.seed(0)
a, b = np.random.random((1000, 100)), np.random.random((100, 2000))
res_1=f_orig(a, b)
res_2=f_pre_opt(a, b)
res_3=f_jdehesa(a, b)
# True
%timeit f_orig(a, b)
#262 ms ± 1.76 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit f_pre_opt(a, b)
#12.4 ms ± 405 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit f_jdehesa(a, b)
#41 ms ± 2.24 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

对于较大的矩阵,此解决方案远非最佳。为了更好地使用缓存,需要额外的优化,例如逐块计算结果。

矩阵-矩阵乘法的实际实现

相关内容

最新更新