计算火花缓存的RDD n次



我正在面临Spark应用程序的问题。这是我的代码的简化版本:

def main(args: Array[String]) {
    // Initializing spark context
    val sc = new SparkContext()
    val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
    System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)
    // Getting files from TGZ archives
    val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
    val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
        logger.debug("Getting files from archive : "+tgzStream._1)
        utils.getFilesFromTgzStream(tgzStream._2)
    })
    // We run the same process with 3 different "modes"
    val modes = Seq("mode1", "mode2", "mode3")
    // We cache the RDD before
    val nb = filesRDD.cache().count()
    logger.debug($nb + " files as input")
    modes.map(mode => {
        logger.debug("Processing files with mode : " + mode)
        myProcessor.process(mode, filesRDD)
    })
    filesRDD.unpersist() // I tried with or without this
    [...]
}

生成的日志是(例如,以3个档案作为输入):

从存档中获取文件:a

从存档中获取文件:b

从存档中获取文件:c

3个文件作为输入

使用模式处理文件:mode1

从存档中获取文件:a

从存档中获取文件:b

从存档中获取文件:c

使用模式处理文件:mode2

从存档中获取文件:a

从存档中获取文件:b

从存档中获取文件:c

使用模式处理文件:mode3

从存档中获取文件:a

从存档中获取文件:b

从存档中获取文件:c

我的火花配置:

  • 版本:1.6.2
  • 执行人:20 x 2cpu x 8go ram
  • 纱线上的纱线内存:800mo
  • 驱动程序:1CPU X 8GO RAM

我从这些日志中了解的是,文件提取是一部分的4次仪器!这显然使我遇到了空间问题和性能泄漏...

我做错了吗?

编辑:我还尝试使用modes.foreach(...)而不是MAP,但没有任何更改...

您是否尝试将modes.map结果传递到列表构造函数(即List(modes.map{ /*...*/}))中?有时(我不确定何时)scala collections懒惰的映射,因此,如果直到Spark删除缓存后才对其进行评估,则必须重新计算。

好吧,经过大量测试,我终于解决了这个问题。实际上存在两个问题:

  1. 我低估了输入数据的大小: SPARK的cachepersist功能如果RDD太大而无法完全存储在总记忆的60%中,我知道,但是我知道,但是认为我的输入数据并不大,但实际上我的RDD是80GB。但是我的记忆中有60%(160GB)仍然超过80GB,所以发生了什么?问题n°2 ...

  2. 我的partitons太大了:在我的代码中的某个地方,我的RDD的分区数设置为100,所以我有100个partitons每个1.6GB。问题在于我的数据由每个MEG的字符串组成,因此我的分区不完整,而10GB的使用内存实际上仅包含7或8GB的真实数据。

要解决这些问题,我必须使用persist(StorageLevel.MEMORY_SER),以增加计算时间,但会大大减少内存使用(根据此基准测试),然后将分区编号设置为1000(根据Spark Docudmentation,建议分区〜128MB)

最新更新