Spark PageRank Tuning



我在yarn上使用scala+spark2.4运行PageRank,但在运行几个小时/作业后总是失败。

--driver-memory 60G --driver-cores 4

--num-executors 250 --executor-cores 2 --executor-memory 32g

输入数据:

  • weightFile有1000个.gz文件,每个500MB,总共500GB
  • linkFile有1000个.gz fules,每个500MB,总共500GB

我应该如何更改代码或spark配置?


sc.setCheckpointDir(checkpointFile)
val weightData = sc.textFile(weightFile).repartition(20000)
val weightUrlData = weightData.map{line => val lines = line.split("t"); (hash(lines(0)) , lines(0), lines(1).toFloat)}
weightUrlData.persist(StorageLevel.DISK_ONLY)
var dataWeight = weightUrlData.map{x => (x._1,x._3)}
dataWeight = dataWeight.reduceByKey{(a,b) => if(a > b) a else b}
val dataUrl = weightUrlData.map{x => (x._1,x._2)}

val totalZ = dataWeight.count.toFloat
val sum1 = dataWeight.map(x => x._2).sum().toFloat
dataWeight = dataWeight.map{x => (x._1,x._2/sum1)}

val linkData = sc.textFile(linkFile).repartition(20000)
val links  = linkData.map{line => val lines = line.split("t");(hash(lines(0)),(hash(lines(1)),lines(2).toFloat))}.groupByKey()
links.persist(StorageLevel.DISK_ONLY) 
links.count()

var ranks = links.mapValues(v => 1.0)

for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
urls.map(url => (url._1, url._2*rank))
}
contribs.persist(StorageLevel.DISK_ONLY)
contribs.count()
val ranksTmp = contribs.reduceByKey(_ + _).mapValues(0.85 * _)
val Zranks = ranksTmp.map(x => x._2).sum()
val Z = totalZ - Zranks
println("total Z: " + totalZ + " Z: " + Z)
val randnZ = dataWeight.map{x => (x._1,x._2*Z)}
val rankResult = ranksTmp.rightOuterJoin(randnZ).map{case(a,(b,c)) => (a,b.getOrElse(0.0) + c) }
ranks = ranks.join(rankResult).map{case(a,(b,c)) => (a,c)}
if(i % 2 == 0) {
ranks.persist(StorageLevel.MEMORY_AND_DISK)
ranks.checkpoint()
ranks.count()
}else{
ranks.count()
}
if(i == iters) {
rankResult.map{case(a,b) => a.toString + "t" + b.toString}.saveAsTextFile(outputFile)
dataUrl.join(rankResult).values.map{case (a,b) => a + "t" + b.toString}.saveAsTextFile(outputFile + "UrlAndWeight")
}
```

仅仅看一眼代码就很难猜出为什么你的代码不能正常工作。几年前,我实现了一个Pagerank,用于在社交图中对用户进行排名,它对我来说运行得很顺利-link。也许这对你有帮助。Spark的Pregel接口运行pagerank直到收敛,或者您可以设置固定的迭代次数。

相关内容

  • 没有找到相关文章

最新更新