我对R编程比较陌生,到目前为止这个网站对我很有帮助,但是我找不到一个已经涵盖了我想知道的问题。所以我决定自己发一个问题。
我的问题是:我想找到有效的方法来计算四维数组的累积和,即我有一个四维数组x的数据,并想写一个函数来计算数组x_sum,这样
x_sum [i, j, k, l] = sum_ {ind1 & lt; =我ind2 & lt; = j, ind3 & lt; = k, ind4 & lt; = l} x [ind1、ind2 ind3, ind4]。
我想要使用这个函数数十亿次,这使得它尽可能高效变得非常重要。尽管我已经提出了几种计算总和的方法(见下文),但我怀疑更有经验的R程序员可能能够找到更有效的解决方案。因此,如果有人能提出更好的方法,我将非常感激。
以下是我到目前为止所做的尝试:
我发现了三种不同的实现(每一种都带来了速度的提高)可以工作(见下面的代码):一个在R中使用cumsum()函数(cumsum_4R),另两个在C中完成"繁重工作"的实现(使用.C()接口)。C中的第一个实现仅仅是使用嵌套的for循环和指针算术(cumsumC_4_old)编写总和的幼稚尝试。在第二个c实现(cumsumC_4)中,我尝试使用以下文章
中的思想来调整我的代码。正如您在下面的源代码中看到的,这种适应是相对不平衡的:对于某些维度,我能够替换所有嵌套的For循环,但对于其他维度则不能。你有什么好主意吗?
在三个实现上使用微基准测试,对于大小为40x40x40x40的数组,我得到以下结果:Unit: milliseconds
expr min lq mean median uq
cumsum_4R(x) 976.13258 1029.33371 1064.35100 1051.37782 1074.23234
cumsumC_4_old(x) 174.72868 177.95875 192.75392 184.11121 203.18141
cumsumC_4(x) 56.87169 57.73512 67.34714 63.20269 68.80326
max neval
1381.5832 50
283.2384 50
105.7099 50
附加信息:1)由于这样可以更容易地安装任何需要的软件包,我在我的个人电脑上运行Windows的基准测试,但我计划在我大学的一台电脑上运行完成的模拟,这台电脑运行Linux。
编辑:2)四维数据x[i,j,k,l]实际上是外部函数的两个应用积的结果:首先是矩阵与自身的外部积(即outer(mat,mat)),然后取另一个矩阵(即outer(mat2, mat2, pmin))的两两最小值。那么数据就是product
x = outer(mat, mat) * outer(mat2, mat2, pmin),
。
x (i, j, k, l) =垫(i, j) *垫(k, l) *分钟(mat2 (i, j), mat2 [k, l])
四维数组具有相应的对称性。
3)我首先需要这些累积和的原因是我想要运行一个测试的模拟,我需要对"矩形"的索引进行部分和:我想迭代所有形式为
的和sum_ {k1<= i1 & lt; = m1, k2<= i2 & lt; = m2, k1 & lt; = i3 & lt; = m1, k2 & lt; =预告& lt; = m2} x (i1、i2 i3,预告),
1 & lt; = k1<= m1<= n, 1 & lt; = k2<= m2<= n。为了避免一遍又一遍地计算相同变量的和,我首先计算所有的累积和,然后计算矩形的和作为累积和的函数。你知道更有效的方法吗?编辑到3):为了包括所有可能重要的方面:我还想计算形式
的总和sum_ {k1<= i1 & lt; = m1, k2<= i2 & lt; =平方米,1 & lt; = i3 & lt; = n, 1 & lt; =预告& lt; = n} x (i1、i2 i3,预告)。
(因为我可以使用累积和简单地获得它们,所以我之前没有包括这个规范)。
这是我使用的C代码(我保存为" cumsumC.c "):
#include<R.h>
#include<math.h>
#include <stdio.h>
int min(int a, int b){
if(a <= b) return a;
else return b;
}
void cumsumC_4_old(double* x, int* nv){
int n = *nv;
int n2 = n*n;
int n3 = n*n*n;
//Dim 1
for(int i=0; i<n; i++){
for(int j=0; j<n; j++){
for(int k=0; k<n; k++){
for(int l=1; l<n; l++){
x[i+j*n+k*n2+l*n3] += x[i + j*n +k*n2+(l-1)*n3];
}
}
}
}
//Dim 2
for(int i=0; i<n; i++){
for(int j=0; j<n; j++){
for(int k=1; k<n; k++){
for(int l=0; l<n; l++){
x[i+j*n+k*n2+l*n3] += x[i + j*n +(k-1)*n2+l*n3];
}
}
}
}
//Dim 3
for(int i=0; i<n; i++){
for(int j=1; j<n; j++){
for(int k=0; k<n; k++){
for(int l=0; l<n; l++){
x[i+j*n+k*n2+l*n3] += x[i + (j-1)*n +k*n2+l*n3];
}
}
}
}
//Dim 4
for(int i=1; i<n; i++){
for(int j=0; j<n; j++){
for(int k=0; k<n; k++){
for(int l=0; l<n; l++){
x[i+j*n+k*n2+l*n3] += x[i-1 + j*n +k*n2+l*n3];
}
}
}
}
}
void cumsumC_4(double* x, int* nv){
int n = *nv;
int n2 = n*n;
int n3 = n*n*n;
long ind1, ind2;
long index, indexges = n +(n-1)*n+(n-1)*n2+(n-1)*n3, indexend;
//Dim 1
index = n3;
while(index != indexges){
x[index] += x[index-n3];
index++;
}
//Dim 2
long teilind = n+(n-1)*n;
for(int k=1; k<n; k++){
ind1 = k*n2;
ind2 = ind1 - n2;
for(int l=0; l<n; l++){
index = l*n3;
indexend = teilind+index;
while(index != indexend){
x[index+ind1] += x[index+ind2];
index++;
}
}
}
//Dim 3
ind1 = n;
while(ind1 < n+(n-1)*n){
index = 0;
indexend = indexges - ind1;
ind2 = ind1-n;
while(index < indexend){
x[ind1+index] += x[ind2+index];
index += n2;
}
ind1++;
}
//Dim 4
index = 0;
int i;
long minind;
while(index < indexges){
i = 1;
minind = min(indexges, index+n);
while(index+i < minind){
x[index+i] += x[index+i-1];
i++;
}
index+=n;
}
}
这里是R函数"cumsum_4R"以及在R中调用和比较累积和函数的代码(在Windows;对于Linux,命令dyn.load/dyn。卸料需调整;理想情况下,我想在50^4大小的数组上使用函数,但由于对微基准的调用需要一段时间,所以我在这里选择n=40):
library("microbenchmark")
# dyn.load("cumsumC.so")
dyn.load("cumsumC.dll")
cumsum_4R <- function(x){
return(aperm(apply(apply(aperm(apply(apply(x, 2:4,function(a) cumsum(as.numeric(a))), c(1,3,4) , function(a) cumsum(as.numeric(a))), c(2,1,3,4)), c(1,2,4), function(a) cumsum(as.numeric(a))), 1:3, function(a) cumsum(as.numeric(a))), c(3,4,2,1)))
}
cumsumC_4_old <- function(x){
n <- dim(x)[1]
arr <- array(.C("cumsumC_4_old", res=as.double(x), as.integer(n))$res, dim=c(n,n,n,n))
return(arr)
}
cumsumC_4 <- function(x){
n <- dim(x)[1]
arr <- array(.C("cumsumC_4", res=as.double(x), as.integer(n))$res, dim=c(n,n,n,n))
return(arr)
}
set.seed(1234)
n <- 40
x <- array(rnorm(n^4),dim=c(n,n,n,n))
r <- 6 #parameter for rounding results for comparison
res1 <- cumsum_4R(x)
res2 <- cumsumC_4_old(x)
res3 <- cumsumC_4(x)
print(c("Identical R and C1:", identical(round(res1,r),round(res2,r))))
print(c("Identical R and C2:",identical(round(res1,r),round(res3,r))))
times <- microbenchmark(cumsum_4R(x), cumsumC_4_old(x),cumsumC_4(x),times=50)
print(times)
dyn.unload("cumsumC.dll")
# dyn.unload("cumsumC.so")
谢谢你的帮助!
您确实可以使用原问题中的第2点和第3点来更有效地解决问题。实际上,这使得问题可分离。我所说的可分离性是指方程3中4个和的极限与求和的变量无关。这个和x
是两个矩阵的外积的事实使你能够将方程3中的4倍和分离成两个2倍和的外积。更好的是:用于定义x
的两个矩阵是相同的(由您表示为mat
) -因此两个2倍和给出相同的矩阵,只需计算一次。下面的代码是:
set.seed(1234)
n=40
mat=array(rnorm(n^2),dim=c(n,n))
x=outer(mat,mat)
cumsum_sep=function(x) {
#calculate matrix corresponding to 2-fold sums
#actually it's just one matrix because x is an outer product of mat with itself
tmp=t(apply(apply(x,2,cumsum),1,cumsum))
#outer product of two-fold sums
outer(tmp,tmp)
}
y1=cumsum_4R(x)
#note that cumsum_sep operates on the original matrix mat!
y2=cumsum_sep(mat)
检查结果是否相同
all.equal(y1,y2)
[1] TRUE
给出了基准测试结果
microbenchmark(cumsum_4R(x),cumsum_sep(mat),times=10)
Unit: milliseconds
expr min lq mean median uq max neval cld
cumsum_4R(xx) 2084.454155 2135.852305 2226.59692 2251.95928 2270.15198 2402.2724 10 b
cumsum_sep(x) 6.844939 7.145546 32.75852 14.45762 34.94397 120.0846 10 a
差别太大了!:)