Spark将任务分配给几个执行器



我喜欢并行运行SQL查询,并能够控制8个查询的并行级别。现在,我正在编写这段代码。这个想法是创建8个分区,并允许执行器并行运行它们。

(1 to 8).toSeq.toDF.repartition(8) // 8 partitions
.rdd.mapPartitions(
x => {
val conn = createConnection()
x.foreach{
s => { // expect the below query be run concurently
execute(s"SELECT * FROM myTable WHERE col = ${s.get(0)}")
}
}
conn.close()
x
}).take(1)

问题是这8个查询是逐个运行的。

我应该如何继续让查询以8乘8运行?

进行时

val df = (1 to 8).toSeq.toDF.repartition(8)

这不会创建8个分区,每个分区有1条记录。如果检查此数据帧(请参见例如。https://stackoverflow.com/a/46032600/1138523),然后你得到:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|               0|                0|
|               1|                0|
|               2|                0|
|               3|                0|
|               4|                0|
|               5|                0|
|               6|                4|
|               7|                4|
+----------------+-----------------+

因此,您将只有2个非空分区,因此您将具有最大2倍的并行性(我在这里问过:Spark中的Round Robin分区是如何工作的?)

要制作大小相等的分区,最好使用

spark.sparkContext.parallelize((0 to 7), numSlices = 8)

而不是

(1 to 8).toSeq.toDF.repartition(8).rdd

第一个选项为每个分区提供1条记录,第二个选项不是因为它使用循环分区

附带说明一下,当您执行x.foreach时,x将被消耗(迭代器只能遍历一次),因此如果您返回x,您将始终得到一个空迭代器。

所以你的最终代码可以是这样的:

spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.mapPartitions(
x => {
val xL = x.toList  // convert to List
assert(xL.size==1) // make sure partition has only 1 record
val conn = createConnection()
xL.foreach{
s => { // expect the below query be run concurently
execute(s"SELECT * FROM myTable WHERE col = ${s}")
}
}
conn.close()
xL.toIterator
})
.collect // trigger all queries

不使用mapPartitions(它是惰性的),您也可以使用foreachPartition,它是非惰性

由于每个分区只有1条记录,因此迭代分区并不是真正有益的,您也可以只使用普通的foreach:

spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.foreach( s=> {
val conn = createConnection()
execute(s"SELECT * FROM myTable WHERE col = ${s}")   
conn.close()
})

最新更新