Mahout spark项目相似性saveAsTextFile最后阶段非常缓慢



我在cli的HDP 2.2集群上的Spark 1.5.1上以YARN客户端模式使用Mahout 0.11.0。我的输入大约是325Mb,分为1000个部分文件。这是我调用的确切命令:

$MAHOUT_HOME/bin/mahout spark-itemsimilarity --input unit-similarity-dump/bpc1 --output mahout-cooccurrence-output4 --maxPrefs 200 --maxSimilaritiesPerItem 100 --master yarn-client --sparkExecutorMem 10g -D:spark.yarn.executor.memoryOverhead=1024 -D:spark.executor.cores=5 -D:spark.executor.instances=50 -D:spark.yarn.am.memory=4g -D:spark.yarn.am.memoryOverhead=512 -D:spark.yarn.am.cores=2 -D:spark.driver.memory=20g -D:spark.yarn.driver.memoryOverhead=2048 -D:spark.driver.cores=4 -D:spark.driver.maxResultSize=10g -D:spark.yarn.queue=product -D:hdp.version=2.2.6.0-2800

应用程序很好地运行,直到最后一个阶段,调用saveAsTextFile。此时,任务逐渐停止,每项任务都需要45分钟到一个小时才能成功。经过仔细检查,似乎每个任务都在读取MapPartitionsRDD的所有1000个分区,我直观地认为,这一定是性能问题的根源。这些分区在所有执行器中分布得有些均匀,所以我认为每个任务都需要从不是其直接父任务的n-1个执行器中请求所有分区。

优化此应用程序的最佳方法是什么?更少的分区,所以请求的远程数据更少?更少的执行器,所以每个任务的数据本地化百分比更高?是否尝试强制RDD使用更高的复制因子?目前,它似乎默认为存储级别:内存反序列化1x复制,100%缓存。

为了清晰起见,下面是阶段详细信息的屏幕截图:saveAsTextFile阶段

提前感谢您的任何见解。

更新:

我尝试只使用一个具有多个核心(即任务)的执行器,尽管所有RDD分区都存在于一个本地执行器上,但性能仍然非常慢。我认为唯一剩下的罪魁祸首是reduceByKey在最后的saveAsTextFile DAG中造成的洗牌。

第二次更新:

我还尝试过只使用1个输入分区,而我以前使用的是100甚至1000。结果非常相似,在此进行总结。为了清楚起见,在这次运行中,我使用了一个20克的5个内核的执行器。然而,这种方法确实导致聚合资源分配减少了大约一个数量级(以MB秒和vcore秒为单位)。这可能是由于在以前的运行中执行器和线程的过度分配,这意味着瓶颈可能不受计算限制。

不确定我是否遵循了上面的所有描述。有一个BiMap双向字典,它将列和行id从序数Mahout id转换为字符串外部id。这些数据结构在内存中,每种类型的id(行/列)有2个哈希映射。reduceByKey在Mahout id上工作,因此翻译只在输入和输出期间发生。这些数据结构被读取到驱动程序中,然后广播到每个节点,每个节点只制作一个副本,执行器共享BiMap(实际上是BiDictionary)。

默认情况下,分区设置为"自动"。在Mahout 11中,这应该是一个针对共现计算优化的值,这就是为什么事情"嗡嗡作响"的原因。

在之后的最后一步reduceByKey获取剩余矩阵(行键、向量)中的每个值,将键和向量元素的每个id转换回字符串,并将文本并行写入文件。

坦率地说,我发现文本文件的读写在很大程度上依赖于手工调整。我的主要经验是并行读取,其中Spark在分区之前读取所有文件统计数据。与读取前将1000个文件连接到一个文件相比,这是非常慢的(自己试试,他们可能已经解决了这个问题)。

听起来你需要一个更好的saveAsTextFile。手动调整saveAsTextFile可能最好使用您自己的分布式操作,foreach在基于您自己的参数对RDD进行一些重新分区后才能工作。请参阅此处的文档:http://spark.apache.org/docs/latest/programming-guide.html#printing-rdd 的元素

如果您想进行实验,可以使用TextDelimitedIndexedDatasetReaderWriter子类来提供您自己的编写器Trait。Mahout有一个mapBlock操作,也可以使用。它将一个行块传递给每个mapBlock,您可以使用BiMap来转换id。

很想听听Mahout用户列表上的任何结果。

最新更新