r语言 - sys.sleep in apply and parSapply



我使用的代码在给定时间内中断命令的执行,以便不达到(crunchbase(API限制。

managed_call <- function(f, events = 44L, every = 60L) {
force(f)
minute_ <- rep(NA, events)
function(...) {       
m_dif <- as.numeric(Sys.time() - minute_, units = "secs")
minute_[!is.na(m_dif) & m_dif > every] <<- NA
calls_remaining <- sum(is.na(minute_))
if (!calls_remaining) {
message("Close to API limit, pausing for ", 
round(every - max(m_dif), 3), " seconds")
Sys.sleep(every - max(m_dif))
minute_[which.max(m_dif)] <- NA
minute_[Position(is.na, minute_)] <<- Sys.time()
f(...)
} else {
minute_[Position(is.na, minute_)] <<- Sys.time()
f(...)
}
}
}

当应用正常的 apply 或 lapply 命令时,这种代码的平静给了我以下警告:

Updated <- function(x){is.null(crunchbase_GET(x))}

> abc <- unlist(lapply(websites,Updated))
Close to API limit, pausing for 1.25 seconds
Close to API limit, pausing for 1.119 seconds
...

但是,我尝试了makeCluster和parSapply的另一个选项:

library("parallel")
abc<- logical(100) 
Updated <- function(x){is.null(crunchbase_GET(x))}
cl <- makeCluster(detectCores(), type = "PSOCK")
clusterExport(cl, varlist = "websites")
clusterEvalQ(cl = cl, library(rcrunchbase))
abc <- parSapply(cl = cl, X = websites, FUN = Updated, USE.NAMES = FALSE)

现在不显示警告消息。因此,我想知道是否甚至执行了实际的 Sys.sleep(( 命令,如果没有,是否有可能让我的代码使用 parSapply 运行。

我真的很抱歉,我无法提供可以针对此特定情况重现的良好示例,因为使用 rCrunchbase 并因此检索有关 API 限制等的信息需要user_key。

message不会parSapply"转义",而是丢失,catwarning也是如此。将基本信息从cl子进程传递到父进程的能力很困难。

另一种选择(实际上是扩展,因为它们依赖于parallel(是futurefuture.apply,因为它们确实处理控制台输出。

cl <- parallel::makeCluster(3)
parallel::parLapply(cl, 1:3, function(i) { message("Hello: ", i+100); Sys.getpid(); })
# [[1]]
# [1] 22680
# [[2]]
# [1] 14504
# [[3]]
# [1] 27084

但是future

library(future)        # plan, cluster
library(future.apply)  # future_lapply
# using the same 'cl'
plan(cluster, workers = cl)
future_lapply(1:3, function(i) { message("Hello: ", i+100); Sys.getpid(); })
# Hello: 101
# Hello: 102
# Hello: 103
# [[1]]
# [1] 22680
# [[2]]
# [1] 14504
# [[3]]
# [1] 27084

(变体可以证明catwarning也会转义子进程。

最新更新