如何有效地使不同的SparkSQL执行并行



环境

  • Scala
  • Apache Spark:Spark 2.2.1
  • emr在AWS上:EMR-5.12.1

内容

我有一个大数据框架,如下:

val df = spark.read.option("basePath", "s3://some_bucket/").json("s3://some_bucket/group_id=*/")

s3://some_bucket上有JSON文件〜1TB,其中包括5000个group_id分区。我想使用SparkSQL执行转换,并且每个group_id都不同。

火花代码如下:

// Create view
val df = spark.read.option("basePath", "s3://data_lake/").json("s3://data_lake/group_id=*/")
df.createOrReplaceTempView("lakeView")
// one of queries like this:
// SELECT 
//   col1 as userId,
//   col2 as userName,
//   .....
// FROM
//   lakeView
// WHERE
//   group_id = xxx;
val queries: Seq[String] = getGroupIdMapping
// ** Want to know better ways **
queries.par.foreach(query => {
  val convertedDF: DataFrame = spark.sql(query)
  convertedDF.write.save("s3://another_bucket/")
})

par可以通过 Runtime.getRuntime.availableProcessors num并行化,并且等于驱动器的数量。

,但似乎很奇怪且不够有效,因为它与Spark的划分无关。

我真的很想使用scala.collection.Seq中的groupBy

这不是正确的火花代码:

df.groupBy(groupId).foreach((groupId, parDF) => {
  parDF.createOrReplaceTempView("lakeView")
  val convertedDF: DataFrame = spark.sql(queryByGroupId)
  convertedDF.write.save("s3://another_bucket")
})

1(首先,如果您的数据已经存储在每个组ID中,则没有理由将其混合,然后使用SPARK通过ID进行组组。为每个组ID加载仅相关文件

更简单,更有效

2(火花本身平行于计算。因此,在大多数情况下,不需要外部并行化。但是,如果您觉得Spark无法利用所有资源:

a(如果每个单独的计算所需的时间少于几秒钟,则任务计划开销与任务执行时间相当,因此可以通过并行运行几个任务来获得提升。

b(计算需要大量时间,但资源仍未得到充分利用。那么您很可能应该增加数据集的分区数量。

3(如果您最终决定并行运行多个任务,则可以通过这种方式实现:

val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = tasks.map(query => {
  Future{
    //spark stuff here
    0
  }(ec)
})
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit 

最新更新