我已经阅读了一些关于这些主题的问题以及一些教程,但未能解决我的问题,所以决定问问自己。
我有大量类型的大文件,比如 A、B、C;在某些情况下,我需要将 B、C 与 A 连接起来。我在具有 64 个 CPU 和 240GB 的远程服务器上工作,所以我自然而然地想并行使用它的电源和处理。我掌握的一个重要知识是,如果a_i文件只能与 b_i 成功连接,则 B b_(i+1),C 也是如此。 我最初的尝试是为"a_i"文件设置一个"join_i"函数,然后并行运行它(我有 448 个文件)。但是,没有显着的时间改进,事实上,当我观察性能时 - 可悲的是,CPU 的加载百分比非常低。就我所能深入研究的问题而言,我认为瓶颈是IO,特别是因为所有文件都很大。这是一个有效的假设吗? 无论如何,在第二次尝试时,我决定按顺序浏览每个文件,但在每次迭代中使用并行优势。然而,经过多次尝试,我在这里也没有任何运气。我试图在下面做一个最小的例子,其中并行要慢得多(事实上,在我的真实数据上它会冻结)。这是怎么回事?是代码错误还是对 R 中并行工作方式的更深层次的误解?另外,我尝试了一些multidplyr和mclapply,但在这两种情况下都没有运气。 我还想指出,读取文件需要的不仅仅是连接本身:在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 解压缩),而连接大约需要 10 秒。鉴于此,这里最好的策略是什么? 提前感谢!
library(dplyr)
A=data.frame(cbind('a', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
B=data.frame(cbind('b', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
C=data.frame(cbind('c', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
chunk_join=function(i, A, B, C)
{
A_i=A %>% filter(X2==i)
B_i=B %>% filter(X2==i) %>% select(X1, X3)
C_i=C %>% filter(X2==i) %>% select(X1, X3)
join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
}
library(parallel)
library(foreach)
cl = parallel::makeCluster(10)
doParallel::registerDoParallel(cl)
# not parallel
s1=Sys.time()
join1=data.frame()
join1 = foreach(j=1:10, .combine='rbind',
.packages=c('dplyr'),
.export=c('chunk_join','A', 'B', 'C')) %do%
{
join_i=chunk_join(j, A, B, C)
}
t1=Sys.time()-s1
colnames(join1)[4:5]=c('joinedB', 'joinedC')
r1=c(sum(!is.na(join1$joinedB)), sum(!is.na(join1$joinedC)))
# parallel
s2=Sys.time()
join2=data.frame()
join2 = foreach(j=1:10, .combine='rbind',
.packages=c('dplyr'),
.export=c('chunk_join','A', 'B', 'C')) %dopar%
{
join_i=chunk_join(j, A, B, C)
}
t2=Sys.time()-s2
stopCluster(cl)
colnames(join2)[4:5]=c('joinedB', 'joinedC')
r2=c(sum(!is.na(join2$joinedB)), sum(!is.na(join2$joinedC)))
R=rbind(r1, r2)
T=rbind(t1, t2)
R
T
在我的服务器上,%do% 大约 5 秒,%dopar% 超过 1m。请注意,这是针对联接本身的,甚至没有考虑创建集群的时间。 顺便问一下,有人可以评论我应该有多少个集群吗?比如说,我在 X 个偶数大小的块上对数据进行分区,并有 Y CPU 可用 - 我应该尽可能多地放置 Y - 还是 X,还是其他数量的集群?
多线程运行缓慢有两个问题:
1) 数据传输到新线程 2) 从新线程传输回主线程的数据传输
问题 #1 通过使用mclapply
完全避免,除非修改数据,否则不会复制数据,在 unix 系统上。 (makeCluster
默认使用套接字传输数据)。
问题 #2 无法使用mclapply
来避免,但您可以做的是尽量减少传输回主线程的数据量。
朴素的麦克拉应用:
join3 = mclapply(1:10, function(j) {
join_i=chunk_join(j, A, B, C)
}, mc.cores=4) %>% rbindlist
稍微聪明一点:
chunk_join2=function(i, A, B, C)
{
A_i=A %>% filter(X2==i)
B_i=B %>% filter(X2==i) %>% select(X1, X3)
C_i=C %>% filter(X2==i) %>% select(X1, X3)
join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
join_i[,c(-1,-2,-3)]
}
A <- arrange(A, X2)
join5 = mclapply(1:10, function(j) {
join_i=chunk_join2(j, A, B, C)
}, mc.cores=4) %>% rbindlist
join5 <- cbind(A, join5)
基准:
Single threaded: 4.014s
Naive mclapply: 1.860 s
Slightly smarter mclapply: 1.363 s
如果您的数据有很多列,您可以看到问题 #2 将如何完全使系统陷入困境。 例如,通过返回 B 和 C 的索引而不是整个 data.frame 子集,您可以做得更好。