r-Future doFuture foreach后端问题与mutate有关



当我使用未来的后端运行时,我的mutate块会失败。如果我使用snow后端,突变块会得到正确的评估。如果我使用基数R,它适用于Future。

有没有想过为什么这不起作用?我应该只使用雪作为后端吗?

我将我的会话包括在中

参见下面的简化示例:

library(magrittr)
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(tibble)
library(foreach)
library(future)
library(stringr)
library(doSNOW)
#> Loading required package: iterators
#> Loading required package: snow
tmp_tb <- tibble::tibble(Id = c(1:5),
Sample_color = c("green", "blue", "yellow", "orange", "grey"),
Sample_text = c("n Test1", "Test2", "test 3", "test 4", "test 5"))
tmp_fun <- function(loop_n, df) {
print(paste0(loop_n, "before withCallingHndlearsn"))
status_tb <- tibble::tibble(Foreach_loop = as.character(),
For_loop = as.character(),
Status = as.character())
for (i in seq_len(nrow(df))) {
withCallingHandlers({
withRestarts({
if (i == 2) {
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)
} else if (i == 3) {
tb_test_df <- df %>%
dplyr::mutate(TEST = "")
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)
} else if (i == 4) {
tb_test_df <- df
tb_test_df$TEST <- ""
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)

} else {
stop("this is an error!")
}

}, muffleStop = function() {
message("'stop' muffled")
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Failure")
status_tb <- rbind(status_tb, tmp_status_tb)
assign(x = "status_tb", value = status_tb, envir = parent.frame(n = 4))
})
},
error = function(cond) {
print(cond$message)
invokeRestart("muffleStop")
}
)
}
print(paste0(loop_n, "after withCallingHndlearsn"))
return(status_tb)
}
doFuture::registerDoFuture()
numWorkers <- 2
future::plan(future::multisession, workers = numWorkers, gc = FALSE, earlySignal = TRUE)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = TRUE, .errorhandling = "pass") %dopar% {
tmp_fun(loop_n = out_i, df = tmp_tb)
}
#> numValues: 2, numResults: 0, stopped: TRUE
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "1before withCallingHndlearsn"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "1after withCallingHndlearsn"
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "2before withCallingHndlearsn"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "2after withCallingHndlearsn"
#> got results for task 1
#> numValues: 2, numResults: 1, stopped: TRUE
#> returning status FALSE
#> got results for task 2
#> numValues: 2, numResults: 2, stopped: TRUE
#> calling combine function
#> evaluating call object to combine results:
#>   fun(accum, result.1, result.2)
#> returning status TRUE
future::plan("default")
output_df <- bind_rows(status_ls)
output_df
#> # A tibble: 10 x 3
#>    Foreach_loop For_loop Status 
#>           <int>    <int> <chr>  
#>  1            1        1 Failure
#>  2            1        2 Good   
#>  3            1        3 Failure
#>  4            1        4 Good   
#>  5            1        5 Failure
#>  6            2        1 Failure
#>  7            2        2 Good   
#>  8            2        3 Failure
#>  9            2        4 Good   
#> 10            2        5 Failure
numWorkers <- 2
cl <- parallel::makeCluster(numWorkers)
doSNOW::registerDoSNOW(cl)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = TRUE, .errorhandling = "pass",
.export = c('%>%')) %dopar% {
tmp_fun(loop_n = out_i, df = tmp_tb)
}
#> discovered package(s): 
#> automatically exporting the following variables from the local environment:
#>   tmp_fun, tmp_tb 
#> explicitly exporting variables(s): %>%
#> explicitly exporting package(s): 
#> numValues: 2, numResults: 0, stopped: TRUE
#> numValues: 2, numResults: 1, stopped: TRUE
#> returning status FALSE
#> numValues: 2, numResults: 2, stopped: TRUE
#> calling combine function
#> evaluating call object to combine results:
#>   fun(accum, result.1, result.2)
#> returning status TRUE
parallel::stopCluster(cl)
output_df <- bind_rows(status_ls)
output_df
#> # A tibble: 10 x 3
#>    Foreach_loop For_loop Status 
#>           <int>    <int> <chr>  
#>  1            1        1 Failure
#>  2            1        2 Good   
#>  3            1        3 Good   
#>  4            1        4 Good   
#>  5            1        5 Failure
#>  6            2        1 Failure
#>  7            2        2 Good   
#>  8            2        3 Good   
#>  9            2        4 Good   
#> 10            2        5 Failure

创建于2021-04-23由reprex包(v2.0.0(

sessionInfo()
#> R version 4.0.0 (2020-04-24)
#> Platform: x86_64-pc-linux-gnu (64-bit)
#> Running under: Red Hat Enterprise Linux
#> 
#> Matrix products: default
#> BLAS:   /opt/R/R_4.0.0/lib64/R/lib/libRblas.so
#> LAPACK: /opt/R/R_4.0.0/lib64/R/lib/libRlapack.so
#> 
#> locale:
#>  [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
#>  [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
#>  [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
#>  [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
#>  [9] LC_ADDRESS=C               LC_TELEPHONE=C            
#> [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       
#> 
#> attached base packages:
#> [1] stats     graphics  grDevices utils     datasets  methods   base     
#> 
#> other attached packages:
#> [1] doSNOW_1.0.19    snow_0.4-3       iterators_1.0.13 stringr_1.4.0   
#> [5] future_1.21.0    foreach_1.5.1    tibble_3.1.1     dplyr_1.0.5     
#> [9] magrittr_2.0.1  
#> 
#> loaded via a namespace (and not attached):
#>  [1] pillar_1.6.0      compiler_4.0.0    highr_0.9         tools_4.0.0      
#>  [5] digest_0.6.27     evaluate_0.14     lifecycle_1.0.0   pkgconfig_2.0.3  
#>  [9] rlang_0.4.10      reprex_2.0.0      cli_2.4.0         DBI_1.1.1        
#> [13] rstudioapi_0.13   parallel_4.0.0    yaml_2.2.1        xfun_0.22        
#> [17] withr_2.4.2       knitr_1.32        generics_0.1.0    fs_1.5.0         
#> [21] vctrs_0.3.7       globals_0.14.0    tidyselect_1.1.0  doFuture_0.12.0  
#> [25] glue_1.4.2        listenv_0.8.0     R6_2.5.0          parallelly_1.24.0
#> [29] fansi_0.4.2       rmarkdown_2.7     purrr_0.3.4       codetools_0.2-18 
#> [33] ellipsis_0.3.1    htmltools_0.5.1.1 assertthat_0.2.1  utf8_1.2.1       
#> [37] stringi_1.5.3     crayon_1.4.1

问题与withRestarts函数的名称有关。。。。不要使用以马弗开头的名称。

最新更新