R并行函数只返回原始矩阵



我尝试从data.frame逐行检查一行中的元素是否相同。

我的真实数据集包含超过100万行和几列负数和正数,以及0和NA。我总共要检查15个数据集,因此使用了并行变体。

不幸的是,我当前的代码只给出了初始矩阵。由于我从未与";paralel";包裹我对此不太了解;clusterExport";争论,但到目前为止没有任何帮助。

因此,问题是我的错误在哪里,并请求帮助。

非常感谢。

x_x <- data.frame("x"=rep(c(1,2,3,4,5,6,7,8,9,10),10),"y"=rep(c(1,2,3,2,1,NA,7,8,9,10),10),"z"=rep(c(1,2,3,4,5,6,7,8,9,10),10))
library(foreach)
library(doParallel)
no_cores <- detectCores() - 1
cl <- makeCluster(no_cores) 
registerDoParallel(cl)
Test_parallel2 <- function(data_01)
{
# data_01 <- x_x
return_data <- data.frame("V1"=matrix(FALSE,nrow = nrow(data_01),ncol = 1))
data_01 <- as.data.frame(t(data_01))
is_true_eigen <- function(data_vec)
{
# data_vec <- c(TRUE,FALSE,TRUE)
return_data_is_true <- TRUE
for(i in 1:length(data_vec))
{
if(data_vec[i] == FALSE)
{
return_data_is_true <- FALSE
break(i)
}
}
return(return_data_is_true)
}

y <- foreach(i=1:ncol(data_01)) %dopar% {

if( (data_01[1,i] == data_01[2,i]) == TRUE & ( mean(as.numeric(data_01[,i]),na.rm = TRUE) == data_01[1,i]) & ( is_true_eigen(colSums(!is.na(t(data_01[,i]))) > 0) == TRUE ) ) 
{
return_data[i,1] <- TRUE  # i=1
}

}

#parallel::clusterExport(cl = cl,varlist = c("y"),envir=environment())

return(return_data)

}
Test_parallel2(x_x)

编辑:

输出应该是每一行的向量,带有true或false(如果行元素相同(

示例:

第1行(来自x_x(:

1|1|1

应返回一个真实

第4行(来自x_x(:

4|2|4

应返回一个错误

第6行(来自x_x(:

6|NA|6atr

应返回一个错误

由于没有其他人插话,我尝试了一下。我在日常工作或爱好项目中不使用foreachdoParallel包,所以我恢复了并行化的常用方法,即parallel包。

我使用了三种并行化方案:串行(无并行化(、叉线程(仅限类Unix系统(和PSOCK(类Windows&Unix系统(。

为了在没有任何并行化的情况下做到这一点,我们可以定义以下函数:

#####################
### Serial Method ###
#####################
### row_equal() ###
row_equal <- function(data){

## Transpose Data ##
data <- as.data.frame(t(data))

## Apply Function Internals ##
n <- lapply(
X = data,
FUN = unique
)

## Number of Unique Values ##
n <- lengths(n, use.names = FALSE)

## Convert to Output to Logical ##
eq <- ifelse(n == 1L, TRUE, FALSE)

## Output ##
return(eq)

}

为了使用fork线程并行化row_equal()函数,我们可以将其修改为:

##############################
### Parallel (FORK) Method ###
##############################
### row_equal_fork() ###
row_equal_fork <- function(data){

## Transpose Data ##
data <- as.data.frame(t(data))

## Cores ##
n_cores <- max(parallel::detectCores() - 1L, 1L)

## Apply Function Internals ##
n <- parallel::mclapply(
X = data,
FUN = unique,
mc.cores = n_cores
)

## Number of Unique Values ##
n <- lengths(n, use.names = FALSE)

## Convert to Output to Logical ##
eq <- ifelse(n == 1L, TRUE, FALSE)

## Output ##
return(eq)

}

不幸的是,简单的fork线程版本只适用于类Unix系统。Windows无法做到这一点。对于Windows,我们需要设置一个PSOCK群集,将我们希望它执行的作业传递给它,然后在完成/失败时停止群集。在这种情况下,作业非常简单,但对于更复杂的作业,您可能需要使用parallel::clusterEvalQ()来传递集群所需的包,或者使用parallel::clusterExport()来传递集群附加对象。

为了使用PSOCK集群并行化row_equal()函数,我们可以将其修改为:

###############################
### Parallel (PSOCK) Method ###
###############################
### row_equal_psock() ###
row_equal_psock <- function(data){

## Transpose Data ##
data <- as.data.frame(t(data))

## Cores ##
n_cores <- max(parallel::detectCores() - 1L, 1L)
cl <- parallel::makeCluster(n_cores, type = "PSOCK")
on.exit(parallel::stopCluster(cl))

## Apply Function Internals ##
n <- parallel::parLapply(
cl = cl,
X = data,
fun = unique
)

## Number of Unique Values ##
n <- lengths(n, use.names = FALSE)

## Convert to Output to Logical ##
eq <- ifelse(n == 1L, TRUE, FALSE)

## Output ##
return(eq)

}

在您的测试数据帧(x_x(的情况下,我在使用函数时得到了以下输出:

row_equal(x_x)
##  [1]  TRUE  TRUE  TRUE FALSE FALSE FALSE  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE
## [13]  TRUE FALSE FALSE FALSE  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE FALSE
## [25] FALSE FALSE  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE FALSE FALSE FALSE
## [37]  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE FALSE FALSE FALSE  TRUE  TRUE
## [49]  TRUE  TRUE  TRUE  TRUE  TRUE FALSE FALSE FALSE  TRUE  TRUE  TRUE  TRUE
## [61]  TRUE  TRUE  TRUE FALSE FALSE FALSE  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE
## [73]  TRUE FALSE FALSE FALSE  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE FALSE
## [85] FALSE FALSE  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE  TRUE FALSE FALSE FALSE
## [97]  TRUE  TRUE  TRUE  TRUE

所有版本的功能都提供相同的输出:

identical(row_equal(x_x), row_equal_fork(x_x))
## [1] TRUE
identical(row_equal(x_x), row_equal_psock(x_x))
## [1] TRUE

然而,请注意,将函数并行化并不一定会使其运行得更快,因为两种并行化方法都会带来开销;特别是使用PSOCK方法(无论如何,在我糟糕的ARM64笔记本电脑上(:

library(microbenchmark)
microbenchmark(
serial = row_equal(x_x),
parallel_fork = row_equal_fork(x_x),
parallel_psock = row_equal_psock(x_x),
times = 100L
)
## Unit: milliseconds
##            expr        min         lq        mean      median          uq        max neval
##          serial   2.425209   5.059251    6.211456    5.545606    6.358625   21.22458   100
##   parallel_fork  76.978126  92.248626  113.667318  117.062001  127.628751  166.58804   100
##  parallel_psock 949.944959 990.536689 1014.325810 1009.042293 1036.322251 1120.35083   100

如果您的数据集包含许多行,那么您可能会开始看到并行方法的一些好处。我内心的纯粹主义者感觉到了一点小麻烦,那就是必须有一种矢量化的方式来做到这一点。

最新更新