当使用tensorflow java进行推理时,使作业在YARN上运行的内存量异常大。该作业在我的计算机上使用 spark 完美运行(2 核 16Gb RAM(,需要 35 分钟才能完成。但是当我尝试在具有 10 个执行程序 16Gb 内存和 16 Gb 内存开销的 YARN 上运行它时,执行程序因使用太多内存而被杀死。
预测在具有 YARN 2.7.3 和 Spark 2.2.1 的 Hortonworks 集群上运行。以前我们使用DL4J进行推理,一切都在3分钟内运行。 张量在使用后被正确关闭,我们使用mapPartition进行预测。每个任务包含大约 20.000 条记录 (1Mb(,因此这将使输入张量为 2.000.000x14,输出张量为 2.000.000 (5Mb(。
在 YARN 上运行时传递给 Spark 的选项
--master yarn --deploy-mode cluster --driver-memory 16G --num-executors 10 --executor-memory 16G --executor-cores 2 --conf spark.driver.memoryOverhead=16G --conf spark.yarn.executor.memoryOverhead=16G --conf spark.sql.shuffle.partitions=200 --conf spark.tasks.cpu=2
如果我们设置 spark.sql.shuffle.partitions=2000,此配置可能会起作用,但需要 3 小时
更新:
本地和集群之间的差异实际上是由于缺少过滤器。 我们实际上对比我们更多的数据运行预测。
要减少每个分区的内存占用,您必须在每个分区内创建批处理(使用grouped(batchSize)
(。因此,您比对每一行运行预测更快,并且您分配了预制大小(batchSize(的张量。如果你调查tensorflowOnSpark scala推理的代码,这就是他们所做的。下面您将找到此代码可能无法编译的实现的重新设计示例,但您可以了解如何执行此操作。
lazy val sess = SavedModelBundle.load(modelPath, "serve").session
lazy val numberOfFeatures = 1
lazy val laggedFeatures = Seq("cost_day1", "cost_day2", "cost_day3")
lazy val numberOfOutputs = 1
val predictionsRDD = preprocessedData.rdd.mapPartitions { partition =>
partition.grouped(batchSize).flatMap { batchPreprocessed =>
val numberOfLines = batchPreprocessed.size
val featuresShape: Array[Long] = Array(numberOfLines, laggedFeatures.size / numberOfFeatures, numberOfFeatures)
val featuresBuffer: FloatBuffer = FloatBuffer.allocate(numberOfLines)
for (
featuresWithKey <- batchPreprocessed;
feature <- featuresWithKey.features
) {
featuresBuffer.put(feature)
}
featuresBuffer.flip()
val featuresTensor = Tensor.create(featuresShape, featuresBuffer)
val results: Tensor[_] = sess.runner
.feed("cost", featuresTensor)
.fetch("prediction")
.run.get(0)
val output = Array.ofDim[Float](results.numElements(), numberOfOutputs)
val outputArray: Array[Array[Float]] = results.copyTo(output)
results.close()
featuresTensor.close()
outputArray
}
}
spark.createDataFrame(predictionsRDD)
我们使用 FloatBuffer 和 Shape 来创建张量,如本期所推荐的那样