r-在每次迭代期间,将循环与更新并行



我有一些R代码,它将美国所有州的人口普查人口统计数据放在一个列表对象中。块级别的代码可能需要一周的时间才能作为顺序循环运行,因为大约有11M个块,所以我试图将循环并行化,使其更快。我用这个实现了这个目标:

states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
"ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
"MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
"ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
"VT","VA","WA","WV","WI","WY","DC","PR")
library(future.apply)
plan(multiprocess)
ptm <- proc.time()
CensusObj_block_age_sex = list()
CensusObj_block_age_sex[states] <- future_lapply(states, function(s){
county <- census_geo_api(key = "XXX", state = s, geo = "county", age = TRUE, sex = TRUE)
tract  <- census_geo_api(key = "XXX", state = s, geo = "tract",  age = TRUE, sex = TRUE)
block  <- census_geo_api(key = "XXX", state = s, geo = "block",  age = TRUE, sex = TRUE)
censusObj[[s]] <- list(state = s, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
}
)

但是,我需要使它更加健壮。有时,人口普查API会出现问题,所以我希望CensusObj在每次状态迭代时都会更新,这样,如果出现错误,我就不会丢失已完成的数据。这样,如果出现问题(比如我把"WY"拼写为"WU"(,我就可以在剩余状态上重新开始循环

有可能以某种方式做到这一点吗?我对其他并行化方法持开放态度。


上面的代码运行,但似乎遇到了内存问题:

Error: Failed to retrieve the value of MultisessionFuture (future_lapply-3) from cluster RichSOCKnode #3 (PID 80363 on localhost ‘localhost’). The reason reported was ‘vector memory exhausted (limit reached?)’. Post-mortem diagnostic: A process with this PID exists, which suggests that the localhost worker is still alive.

我的.Renviron中有R_MAX_VSIZE=8Gb,但我不确定这将如何在我的机器上的8个核心之间分配。这一切都表明,我需要存储每次迭代的结果,而不是试图将其全部保存在内存中,然后在最后将对象附加在一起。

这里有一个解决方案,它使用doParallel(具有UNIX系统的选项,但您也可以在Windows上使用它,请参阅此处(和foreach,分别存储每个状态的结果,然后读取单个文件并将它们组合成一个列表。

library(doParallel)
library(foreach)
path_results <- "my_path"
ncpus = 8L
registerDoParallel(cores = ncpus)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
"ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
"MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
"ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
"VT","VA","WA","WV","WI","WY","DC","PR")
results <- foreach(state = states) %dopar% {
county <- census_geo_api(key = "XXX", state = state, geo = "county", age = TRUE, sex = TRUE)
tract  <- census_geo_api(key = "XXX", state = state, geo = "tract",  age = TRUE, sex = TRUE)
block  <- census_geo_api(key = "XXX", state = state, geo = "block",  age = TRUE, sex = TRUE)
results <- list(state = state, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)

# store the results as rds
saveRDS(results,
file = paste0(path_results, "/", state, ".Rds"))

# remove the results
rm(county)
rm(tract)
rm(block)
rm(results)
gc()

# just return a string
paste0("done with ", state)
}
library(purrr)
# combine the results to a list
result_files <- list.files(path = path_results)
CensusObj_block_age_sex <- set_names(result_files, states) %>% 
map(~ readRDS(file = paste0(path_results, "/", .x)))

如果发生API错误,您可以在future_lapply中使用tryCatch来尝试重新启动计算,最多可使用maxtrials
在结果列表中,您可以获得每次计算的试验次数和最终状态OKError:

states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
"ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
"MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
"ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
"VT","VA","WA","WV","WI","WY","DC","PR")
library(future.apply)
#> Le chargement a nécessité le package : future
plan(multiprocess)
ptm <- proc.time()
maxtrials <- 3
census_geo_api <-
function(key = "XXX",
state = s,
geo = "county",
age = TRUE,
sex = TRUE) {
paste(state,'-', geo)
}

CensusObj_block_age_sex <- future_lapply(states, function(s) {
ntrials <- 1
while (ntrials <= maxtrials) {
hasError <- tryCatch({
#simulate random error
if (runif(1)>0.3) {error("API failed")}
county <- census_geo_api(key = "XXX", state = s, geo = "county", age = TRUE, sex = TRUE)
tract  <- census_geo_api(key = "XXX", state = s, geo = "tract",  age = TRUE, sex = TRUE)
block  <- census_geo_api(key = "XXX", state = s, geo = "block",  age = TRUE, sex = TRUE)
},
error = function(e)
e)
if (inherits(hasError, "error")) {
ntrials <- ntrials + 1
} else { break}

}

if (ntrials > maxtrials) {
res <- list(state = s, status = 'Error', ntrials = ntrials-1, age = NA, sex = NA, block = NA, tract = NA, county = NA)
} else  {
res <- list(state = s, status = 'OK'    , ntrials = ntrials, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
}
res
}
)
CensusObj_block_age_sex[[1]]
#> $state
#> [1] "AL"
#> 
#> $status
#> [1] "OK"
#> 
#> $ntrials
#> [1] 3
#> 
#> $age
#> [1] TRUE
#> 
#> $sex
#> [1] TRUE
#> 
#> $block
#> [1] "AL - block"
#> 
#> $tract
#> [1] "AL - tract"
#> 
#> $county
#> [1] "AL - county"
<sup>Created on 2020-08-19 by the [reprex package](https://reprex.tidyverse.org) (v0.3.0)</sup>

我的一个可能的解决方案是将CensusObj的值记录到一个文本文件中,即在每次迭代中打印CensusObj。doSNOW包可用于日志记录,例如

library(doSNOW)
cl <- makeCluster(1, outfile="abc.out")
registerDoSNOW(cl)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
"ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
"MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
"ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
"VT","VA","WA","WV","WI","WY","DC","PR")
foreach(i=1:length(states), .combine=rbind, .inorder = TRUE) %dopar% {
county <- "A"
tract  <- "B"
block  <- "C"
censusObj <- data.frame(state = states[i], age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
# edit: print json objects to easily extract from the file
cat(sprintf("%sn",rjson::toJSON(censusObj)))
}
stopCluster(cl)

这将在abc.out中记录censusObj的值,如果程序崩溃,也会记录错误,但您将在abc.out中获得记录的censusObj的最新值。

以下是日志文件中最后一次迭代的输出:

Type: EXEC {"state":"PR","age":true,"sex":true,"block":"C","tract":"B","county":"A"} Type: DONE

Type:EXEC表示迭代已经开始,Type:DONE表示执行完成。cat的结果将出现在每次迭代的这两个语句之间。现在,可以从日志文件中提取CensusObj的值,如下所示:

Lines = readLines("abc.out")
results = list()
for(i in Lines){
# skip processing logs created by doSNOW
if(!startsWith(i, "starting") && !startsWith(i, "Type:")){
results = rlist::list.append(results, jsonlite::fromJSON(i))      
}
}

results将包含abc.out.中打印的所有值的元素

> head(results, 1)
[[1]]
[[1]]$state
[1] "AL"
[[1]]$age
[1] TRUE
[[1]]$sex
[1] TRUE
[[1]]$block
[1] "C"
[[1]]$tract
[1] "B"
[[1]]$county
[1] "A"

这不是一个非常干净的解决方案,但有效。

最新更新