r-这个自定义过程如何并行完成?或多芯



我正试图弄清楚如何在我创建的这个随机林循环中使用任何并行处理包,如foreach或doParallel:

  ModelInfo <- data.frame ( model=as.numeric()
                     ,Nodesize=as.numeric()
                     ,Mrty=as.numeric()
                     ,Maxdepth=as.numeric()
                     ,Cp=as.numeric()
                     ,Accuracy_Training=as.numeric()
                     ,AUC_Training=as.numeric())
               w=1
               set.seed(1809)
              NumberOfSamples=1
              # Number of iterations
              rfPred=list()
              pred=list()
              roundpred=list()
              cTab=list()
              Acc=list()
              pred.to.roc=list()
              pred.rocr=list()
              perf.rocr=list()
              AUC=list()
              Var_imp=list()
            rf_model_tr = list()
            length(rf_model_tr) <- NumberOfSamples
            for (i in 1:NumberOfSamples)
            {
            rf_model_tr[[i]] = list()
            rfPred[[i]]=list()
            pred[[i]]=list()
            roundpred[[i]]=list()
            cTab[[i]]=list()
            Acc[[i]]=list()
            pred.to.roc[[i]]=list()
            pred.rocr[[i]]=list()
            perf.rocr[[i]]=list()
            AUC[[i]]=list()
            Var_imp[[i]]=list()
            ## Tune nodesize
            nodesize =c(10,20,50,80,100,200)
            n=length(nodesize)
            length(rf_model_tr[[i]]) <- n
            for ( j in 1: length (nodesize))
            {
rf_model_tr[[i]][[j]] = list()
rfPred[[i]][[j]]=list()
pred[[i]][[j]]=list()
roundpred[[i]][[j]]=list()
cTab[[i]][[j]]=list()
Acc[[i]][[j]]=list()
pred.to.roc[[i]][[j]]=list()
pred.rocr[[i]][[j]]=list()
perf.rocr[[i]][[j]]=list()
AUC[[i]][[j]]=list()
Var_imp[[i]][[j]]=list()
## Tune mrty
mrtysize =c(2,3,4)
m=length(mrtysize)
length(rf_model_tr[[i]][[j]]) <- m
for ( k in 1: length (mrtysize))
{  

  rf_model_tr[[i]][[j]][[k]] = list()
  rfPred[[i]][[j]][[k]]=list()
  pred[[i]][[j]][[k]]=list()
  roundpred[[i]][[j]][[k]]=list()
  cTab[[i]][[j]][[k]]=list()
  Acc[[i]][[j]][[k]]=list()
  pred.to.roc[[i]][[j]][[k]]=list()
  pred.rocr[[i]][[j]][[k]]=list()
  perf.rocr[[i]][[j]][[k]]=list()
  AUC[[i]][[j]][[k]]=list()
  Var_imp[[i]][[j]][[k]]=list()
  ## Tune maxdepth
  maxdep =c(10,20,30)
  z=length(maxdep)
  length(rf_model_tr[[i]][[j]][[k]]) <- z
  for (l in 1:length (maxdep))
  { 
    rf_model_tr[[i]][[j]][[k]][[l]] = list()
    rfPred[[i]][[j]][[k]][[l]]=list()
    pred[[i]][[j]][[k]][[l]]=list()
    roundpred[[i]][[j]][[k]][[l]]=list()
    cTab[[i]][[j]][[k]][[l]]=list()
    Acc[[i]][[j]][[k]][[l]]=list()
    pred.to.roc[[i]][[j]][[k]][[l]]=list()
    pred.rocr[[i]][[j]][[k]][[l]]=list()
    perf.rocr[[i]][[j]][[k]][[l]]=list()
    AUC[[i]][[j]][[k]][[l]]=list()
    Var_imp[[i]][[j]][[k]][[l]]=list()
    ## Tune cp
    cp =c(0,0.01,0.001)
    p=length(cp)
    length(rf_model_tr[[i]][[j]][[k]][[l]]) <- p
    for (m in 1:length (cp))
      {
        rf_model_tr[[i]][[j]][[k]][[l]][[m]]= randomForest  (as.factor(class) ~.

                                                                  , data=train,mtry=mrtysize[[k]],maxDepth = maxdep[[l]], replace=F, importance=T, do.trace=10, ntree=200,nodesize=nodesize[j],cp=cp[[m]])   
        #Accuracy
        rfPred[[i]][[j]][[k]][[l]][[m]] <- predict(rf_model_tr[[i]][[j]][[k]][[l]][[m]], train, type = "prob")
        pred[[i]][[j]][[k]][[l]][[m]] <- colnames(rfPred[[i]][[j]][[k]][[l]][[m]] )[apply(rfPred[[i]][[j]][[k]][[l]][[m]] ,1,which.max)]
        cTab[[i]][[j]][[k]][[l]][[m]]  = table(pred[[i]][[j]][[k]][[l]][[m]],train$class)
        Acc[[i]][[j]][[k]][[l]][[m]]<- sum(diag(cTab[[i]][[j]][[k]][[l]][[m]])) / sum(cTab[[i]][[j]][[k]][[l]][[m]])
        #AUC
        pred.to.roc[[i]][[j]][[k]][[l]][[m]]<-rfPred[[i]][[j]][[k]][[l]][[m]][,2]
        pred.rocr[[i]][[j]][[k]][[l]][[m]]<-prediction(pred.to.roc[[i]][[j]][[k]][[l]][[m]],as.factor(train$class))
        perf.rocr[[i]][[j]][[k]][[l]][[m]]<-performance(pred.rocr[[i]][[j]][[k]][[l]][[m]],measure="auc",x.measure="cutoff")
        AUC[[i]][[j]][[k]][[l]][[m]]<-as.numeric(perf.rocr[[i]][[j]][[k]][[l]][[m]]@y.values)
        #Variable Importance
        Var_imp[[i]][[j]][[k]][[l]][[m]]<-(importance(rf_model_tr[[i]][[j]][[k]][[l]][[m]],type=2))

        ModelInfo[w,1]<-w
        ModelInfo[w,2]<-nodesize[[j]]
        ModelInfo[w,3]<-mrtysize[[k]]
        ModelInfo[w,4]<-maxdep[[l]]
        ModelInfo[w,5]<-cp[[m]]
        ModelInfo[w,6]<-Acc[[i]][[j]][[k]][[l]][[m]]
        ModelInfo[w,7]<-AUC[[i]][[j]][[k]][[l]][[m]]
        w=w+1
      }
    }
  }
}
}

基本上,我所做的是基于随机林的可用调整参数(nodesize,cp-ect),用一个数据集创建所有可能的模型变体,并在每次迭代时将这些信息存储到表模型信息中。此外,我还添加了准确性和AUC等指标,以便比较最终创建的不同模型并进行选择。

我寻找替代方案的原因是,caret包只为我提供了调谐mtry的功能,尽管在那里我确实有机会运行parRF,这可以解决我的问题,但我更喜欢在这里加入一些东西,这怎么可能呢?

我读过foreach和doParallel包的相关内容,但我不太明白如何在这里进行语法组合。

如果需要初始数据,请告诉我,我只是想在这一点上显示需要并行计算的部分。

提前感谢

嗨,我通常只是手动编码所有内容。在linux/mac中,我使用并行包和mclapply,它们可以使用内存分叉。分叉进程占用的内存更少,启动速度更快。Windows不支持分叉,所以我使用doParallel包(其他包也可以)。foreach()函数是一个用户友好的并行映射器。我发现自己花在建立单台PC并行计算上的时间比从加速中节省的时间还要多。仍然很有趣:)

如果你在一所大学工作,你可能会访问一个大型集群。BatchJobs包是另一个映射器,它可以使用许多不同的后端,例如Torque/PBS que系统。我可以借用80个节点和4个CPU,这可能会使我的速度提高320倍(在实践中更像是150倍)。我从这篇精彩的介绍中了解到了BatchJobs。我喜欢BatchJobs也可以在本地运行单核或多核,这更容易调试。

下面的代码介绍如何使用foreach和BatchJobs创建作业列表。每个作业都是一组参数。工作论点与标准论点相融合,并训练模型。返回一些统计信息,并将所有结果和参数组合到数据帧中。

useForeach = FALSE #If FALSE, will run as batchjobs. Only faster for cluster computing.
library(randomForest)
#load a data set
url =  "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv"
download.file(url,destfile="winequality-white.csv",mode="w")
wwq = read.csv(file="winequality-white.csv",header=T,sep=";")
X = wwq[,names(wwq) != "quality"]
y = wwq[,"quality"]
#2 - make jobs
pars = expand.grid(
  mtry = c(1:3),
  sampsize = floor(seq(1000,1898,length.out = 3)),
  nodesize = c(1,3)
)
jobs = lapply(1:dim(pars)[1], function(i) pars[i,])

#3 - make node function there will excute a number of jobs
test.pars = function(someJobs,useForeach=TRUE) {
  #if running cluster, global environment imported manually
  if(!useForeach) load(file="thisState.rda")
  do.call(rbind,lapply(someJobs,function(aJob){ #do jobs and bind results by rows
    print(aJob)
    merged.args = c(alist(x=X,y=y),as.list(aJob)) #merge std. and job args
    run.time = system.time({rfo = do.call(randomForest,merged.args)}) #run a job
    data.frame(accuracy=tail(rfo$rsq,1),run.time=run.time[3],mse=tail(rfo$mse,1))
  }))
}

##test function single core
jobsToTest = 1:5
out = test.pars(jobs[jobsToTest])
print(cbind(out,do.call(rbind,jobs[jobsToTest])))

#4a execute jobs with foreach package:
if(useForeach) {
  library(foreach)
  library(doParallel)
  CPUs=4 
  cl = makeCluster(CPUs)#works both for windows and linux, otherwise forking is better
  registerDoParallel(cl)
  nodes=min(CPUs,length(jobs)) #how many splits of jobList, not so important for foreach...
  job.array = suppressWarnings(split(jobs,1:nodes)) #split warns if each core cannot get same amount of jobs
  out = foreach(i=job.array,.combine=rbind,.packages="randomForest") %dopar% test.pars(i)
  stopCluster(cl) 
} else {
  library(BatchJobs)
  #4b - execute jobs with BatchJobs package (read manual how to set up on cluster)
  nodes=min(80,length(jobs))   # how many nodes to split job onto
  job.array = split(jobs,1:nodes)
  save(list=ls(),file="thisState.rda") #export this state(global environment) to every node
  #initiate run
  reg = makeRegistry(id ="myFirstBatchJob",packages="randomForest")
  batchMap(reg,fun=test.pars,someJobs = job.array,more.args=list(useForeach=FALSE))
  submitJobs(reg)
  waitForJobs(reg)
  out = loadResults(reg)
  #6- wrap up save filnalResults to user
  finalResult = cbind(do.call(rbind,jobs),do.call(rbind,out))
  save(out,file="finalResult.rda")
  removeRegistry(reg,ask="no")
}
#7- print final result
print(cbind(do.call(rbind,jobs),out))

最新更新