正在尝试此代码
library(future)
library(foreach)
ncores <- 3
cl <- parallel::makeCluster(3)
avail <- bigstatsr::FBM(ncores, 1, type = "integer", init = 1)
doFuture::registerDoFuture()
res <- vector("list", 5)
for (i in seq_along(res)) {
while (sum(avail[]) == 0) {
cat("Waiting..n")
Sys.sleep(0.5)
}
ind.avail <- which(avail[] == 1)
cat("Available:", length(ind.avail), "n")
plan(cluster, workers = cl[ind.avail])
foo <- foreach(i = 3:1) %dopar% {
Sys.sleep(i)
}
print(one <- ind.avail[1])
avail[one] <- 0; print(avail[])
res[[i]] <- cluster(workers = cl[one], {
Sys.sleep(5)
avail[one] <- 1
i
})
}
sapply(res, resolved)
parallel::stopCluster(cl)
我得到的错误:Initialization of plan() failed, because the test future used for validation failed. The reason was: Unexpected result (of class ‘NULL’ != ‘FutureResult’) retrieved for ClusterFuture future (label = ‘<none>’, expression = ‘NA’)
。
解释我的例子试图重现我的真实问题:
- 我在两个步骤中循环多次(此处为5次(
- 第一步很容易用foreach并行化
- 第二步不容易并行化,并且依赖于第一步
因此,我的想法是在所有可用集群上并行化第一步,并仅使用一个集群异步运行第二步。在完成此异步作业之前,此群集将不再可用。然后下一个第一步将减少一个可用的集群,以此类推。当第一步没有可用的集群时,它将等待一些异步作业完成并释放一些集群。
我可以重现这一点。我相信您正在设法破坏与主R进程和群集节点的通信,方法是使用一个群集节点调用plan()
,该群集节点保存尚未返回到主R进程的未来结果。(我试图想出一个更简单的例子来说明这种类型的腐败,但如果不花更多的时间,这并不明显。(
未来的框架已经检测到了这一点(因此出现了错误(。我已经更新了future的开发版本,以提供更多关于正在发生的事情的线索和证据:
Error: Initialization of plan() failed, because the test future used for
validation failed. The reason was: Unexpected result (of class ‘character’
!= ‘FutureResult’) retrieved for ClusterFuture future (label =
‘future-plan-test’, expression = ‘NA’): future-grmall. This suggests that
the communication with ClusterFuture worker (‘SOCKnode’ #1) is out of sync.
我认为你可以通过确保在再次使用他们的员工之前收集已解决期货的价值来解决这个问题。plan(cluster, ...)
调用验证是否可以成功解决至少一个未来问题。