我试图在8核计算机上使用并行包在Windows上处理大量数据。我有一个大数据帧需要逐行处理。对于每一行,我可以估计处理该行所需的时间,这可以从每行10秒到4小时不等。
我不想在clusterApplyLB函数下立即运行整个程序(我知道这可能是最优的方法),因为如果它遇到错误,那么我的整个结果集可能会丢失。我第一次尝试运行我的程序是把它分解成块,然后并行地运行每个块,保存并行运行的输出,然后继续运行下一个块。
问题是,当它通过行运行时,而不是以7倍的"实时"时间运行(我有8个内核,但我想保留一个备用),它似乎只以大约2倍的速度运行。我猜测这是因为向每个核心分配行效率很低。
例如,10行数据具有2个核心,其中两行可以在4小时内运行,其他两行将花费10秒。理论上,这可能需要4小时10秒来运行,但如果分配不合理,则可能需要8小时。(显然这是一种夸张,但是当更多的核和更多的行估计不正确时,也会发生类似的情况)
如果我估计这些时间并按照我估计的正确顺序将它们提交给clusterApplyLB(以使估计的时间分布在各个核心上以最小化所花费的时间),它们可能不会被发送到我想要它们的核心,因为它们可能不会在我估计的时间内完成。例如,我估计两个进程的时间分别为10分钟和12分钟,它们的时间为11.6分11.4秒,那么提交给clusterApplyLB的行顺序就不会像我预期的那样。这种错误可能看起来很小,但是如果我优化了多个长时间的行,那么这种混乱的顺序可能会导致两个4小时的行转到相同的节点,而不是不同的节点(这几乎会使我的总时间翻倍)。
TL,博士。我的问题:是否有一种方法可以告诉R并行处理函数(例如clusterApplyLB, clusterApply, parApply或任何sapply, lapply或foreach变体)哪些行应该发送到哪个核心/节点?即使没有我所处的情况,我认为这将是一个非常有用和有趣的信息。
我想说你的问题有两种不同的解决方法。
第一个是根据预期的每个作业计算时间对作业到节点映射进行静态优化。在开始计算之前,您将为每个作业(即数据框的行)分配一个节点。下面给出了可能实现此功能的代码。
第二个解决方案是动态的,您必须根据clusterApplyLB
中给出的代码制作自己的负载平衡器。您将开始与第一种方法相同,但是一旦作业完成,您就必须重新计算最佳作业到节点的映射。根据您的问题,由于不断进行重新优化,这可能会增加显著的开销。我认为,只要你对预期的计算时间没有偏见,就没有必要这样做。
下面是第一种解决方法的代码:
library(parallel)
#set seed for reproducible example
set.seed(1234)
#let's say you have 100 calculations (i.e., rows)
#each of them takes between 0 and 1 second computation time
expected_job_length=runif(100)
#this is your data
#real_job_length is unknown but we use it in the mock-up function below
df=data.frame(job_id=seq_along(expected_job_length),
expected_job_length=expected_job_length,
#real_job_length=expected_job_length + some noise
real_job_length=expected_job_length+
runif(length(expected_job_length),-0.05,0.05))
#we might have a negative real_job_length; fix that
df=within(df,real_job_length[real_job_length<0]<-
real_job_length[real_job_length<0]+0.05)
#detectCores() gives in my case 4
cluster_size=4
准备作业到节点的映射优化:
#x will give the node_id (between 1 and cluster_size) for each job
total_time=function(x,expected_job_length) {
#in the calculation below, x will be a vector of reals
#we have to translate it into integers in order to use it as index vector
x=as.integer(round(x))
#return max of sum of node-binned expected job lengths
max(sapply(split(expected_job_length,x),sum))
}
#now optimize the distribution of jobs amongst the nodes
#Genetic algorithm might be better for the optimization
#but Differential Evolution is good for now
library(DEoptim)
#pick large differential weighting factor (F) ...
#... to get out of local minimas due to rounding
res=DEoptim(fn=total_time,
lower=rep(1,nrow(df)),
upper=rep(cluster_size,nrow(df)),
expected_job_length=expected_job_length,
control=DEoptim.control(CR=0.85,F=1.5,trace=FALSE))
#wait for a minute or two ...
#inspect optimal solution
time_per_node=sapply(split(expected_job_length,
unname(round(res$optim$bestmem))),sum)
time_per_node
# 1 2 3 4
#10.91765 10.94893 10.94069 10.94246
plot(time_per_node,ylim=c(0,15))
abline(h=max(time_per_node),lty=2)
#add node-mapping to df
df$node_id=unname(round(res$optim$bestmem))
现在是在集群上进行计算的时候了:
#start cluster
workers=parallel::makeCluster(cluster_size)
start_time=Sys.time()
#distribute jobs according to optimal node-mapping
clusterApply(workers,split(df,df$node_id),function(x) {
for (i in seq_along(x$job_id)) {
#use tryCatch to do the error handling for jobs that fail
tryCatch({Sys.sleep(x[i,"real_job_length"])},
error=function(err) {print("Do your error handling")})
}
})
end_time=Sys.time()
#how long did it take
end_time-start_time
#Time difference of 11.12532 secs
#add to plot
abline(h=as.numeric(end_time-start_time),col="red",lty=2)
stopCluster(workers)
根据输入,似乎您已经在该任务中保存了任务的输出。假设每个并行任务都将输出保存为文件,您可能需要一个初始函数来预测特定行的时间。为了做到这一点
- 生成具有估计时间和行号的结构
- 对估计时间进行排序,重新排序行并运行并行
这将自动平衡工作负载。我们有一个类似的问题,这个过程必须按列进行,每个列花费10-200秒。因此,我们生成了一个函数来估计时间,并在此基础上对列重新排序,并对每个列运行并行处理。