火花数据框并行性



以下是我的用户酶,我正在使用apache spark

1)我在HDF上有大约2500个Parquet文件,文件大小从文件到文件变化。

2)我需要处理每个镶木quet文件并构建一个新的数据框架,然后将新的数据帧写入ORC文件格式。

3)我的火花驱动程序程序就是这样。我正在迭代每个文件,处理单个镶木点文件创建一个新的数据框并编写新的数据框为ORC,下面是代码片段。

  val fs = FileSystem.get(new Configuration())
  val parquetDFMap = fs.listStatus(new Path(inputFilePath)).map(folder => {
  (folder.getPath.toString, sqlContext.read.parquet(folder.getPath.toString))})
parquetDFMap.foreach {
  dfMap =>
    val parquetFileName = dfMap._1
    val parqFileDataFrame = dfMap._2
    for (column <- parqFileDataFrame.columns) 
    {
       val rows = parqFileDataFrame.select(column)
            .mapPartitions(lines => lines.filter(filterRowsWithNullValues(_))
            .map(row => buildRowRecords(row, masterStructArr.toArray, valuesArr)))
        val newDataFrame: DataFrame = parqFileDataFrame.sqlContext.createDataFrame(rows, StructType(masterStructArr))
       newDataFrame.write.mode(SaveMode.Append).format("orc").save(orcOutPutFilePath+tableName)
    }
}

这种设计的问题我只能及时处理一个木木quet文件,只有在创建新的数据框架以及将新的数据帧写入ORC格式时才能应用并行性。因此,如果任何任务(例如创建新的数据框架或编写新的数据框架到兽人)需要很长时间才能完成其他排列的镶木木材处理,直到当前的镶木质操作完成为止。

您能以更好的方法或设计此用户酶来帮助我。

您可以为所有镶木quet文件创建一个单个数据框,而不是每个文件范围的一个数据框

val df =  sqlContext.read.parquet(inputFilePath)
df.map(row => convertToORc(row))

我能够通过执行parquetDFMap.foreach.par

并行处理Parquet文件处理。

最新更新