我试图读取多个大型csv文件嵌套并行与未来。
我有一台32核的机器,我想设置嵌套并行(5 × 6)与外部5进程,每个进程6核。我正试图利用data.table::fread(.., nThreads = 6)
的隐式并行性。
R包future
提供嵌套并行,我尝试过
library(future)
plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6)))
,但上面的代码实际上每个子进程只使用1个内核:
plan(list(tweak(multisession, workers = 5),
tweak(multisession, workers = 6)))
registerDoFuture()
foreach(i = 1:5) %dopar% {
availableCores()
}
[[1]]
mc.cores
1
[[2]]
mc.cores
1
[[3]]
mc.cores
1
[[4]]
mc.cores
1
[[5]]
mc.cores
1
有办法做到这一点吗?
(此处为未来的reverse维护者)
…但是上面的每个子进程实际上只使用了1个内核:
我看到这里的误解。你想使用nbrOfWorkers()
(fromfuture))而不是availableCores()
(来自并行)-从中按原样重新导出未来)。这将给你你所期望的:
> foreach(i = 1:5) %dopar% {
nbrOfWorkers()
}
[[1]]
[1] 6
...
[[5]]
[1] 6
availableCores()
返回1(1)的原因是未来框架试图防止嵌套并行化错误。它通过设置控制并行工作和CPU内核(包括options(mc.cores = 1L)
)数量的选项和环境变量来实现这一点。这是由availableCores()
正确拾取的。例如,这可以防止使用y <- mclapply(X, FUN)
、cl <- makeCluster(avaiableCores())
或plan(multisession)
的包在并行工作线程中并行运行。相比之下,nbrOfWorkers()
反映plan()
指定的工人数量。在您的例子中,我们在并行工作中设置了plan(multisession, workers = 6)
,从plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6)))
的第二层开始。
为了让自己确信你确实在与你的设置并行运行,你可以采用https://future.futureverse.org/articles/future-3-topologies.html中的一个例子。
现在,并行线程与并行进程(又名并行工作者)不同。您可以将线程看作一种低级得多的并行化机制。重要的是,未来框架没有约束并行工作者中使用的线程数,包括data.table的并行线程数。用途。因此,您需要显式调用:data <- data.table::fread(.., nThreads = 6)
或者,如果您想灵活地使用当前设置,
data <- data.table::fread(.., nThreads = nbrOfWorkers())
以避免过度并行化。或者,您可以重新配置data.table:
## Set the number of parallel threads used by 'data.table'
## (the default is to use all physical CPU cores)
data.table::setDTthreads(nbrOfWorkers())
data <- data.table::fread(..;)
在doFuture(>= 1.0.0),如果将%dopar%
替换为%dofuture%
,则不再需要registerDoFuture()
。因此,并行读取大量CSV文件的要点是:
library(doFuture)
plan(list(tweak(multisession, workers = 5),
tweak(multisession, workers = 6)))
files <- dir(pattern = "*.csv$")
res <- foreach(file = files) %dofuture% {
data.table::setDTthreads(nbrOfWorkers())
data.table::fread(file)
}
尽管如此,请注意瓶颈可能是文件系统而不是CPU。当并行读取文件时,可能会使文件系统不堪重负,最终会减慢而不是加快文件读取速度。有时并行读取两三个文件会更快,但如果读取的文件更多,就会适得其反。因此,您需要使用不同数量的并行工作者进行基准测试。
此外,现在有一些高度专门化的R包,可以高效地将数据文件读取到R中。其中一些支持高效地读取多个文件。在Package就是这样一个例子。