Spark.sql.shuffle.200 个默认分区的分区难题



在许多帖子中,由于一些关于洗牌、分区、由于 JOIN、AGGR 等原因而对洗牌、分区的问题,有这样一段声明:

。通常,每当执行 spark sql 聚合或联接以随机播放数据时,这就是生成的分区数 = 200。 这是由 spark.sql.shuffle.partitions 设置的。...

所以,我的问题是:

  • 我们的意思是,例如,如果我们为 DF 将分区设置为 765,
  • 处理发生在 765 个分区上,但输出被合并/重新分区标准地为 200 - 这里指的是单词结果
  • 还是在合并/重新分区后使用 200 个分区进行处理,然后加入,聚合?

我问,因为我从来没有看到一个清晰的观点。

我做了以下测试:

// genned a DS of some 20M short rows
df0.count
val ds1 = df0.repartition(765)
ds1.count
val ds2 = df0.repartition(765)
ds2.count
sqlContext.setConf("spark.sql.shuffle.partitions", "765")
// The above not included on 1st run, the above included on 2nd run.
ds1.rdd.partitions.size
ds2.rdd.partitions.size
val joined = ds1.join(ds2, ds1("time_asc") === ds2("time_asc"), "outer") 
joined.rdd.partitions.size
joined.count
joined.rdd.partitions.size

在第一次测试中 -未定义sqlContext.setConf("spark.sql.shuffle.partitions","765">(,结果的处理和分区数为200。即使 SO post 45704156 声明它可能不适用于 DF - 这是一个 DS。

在第二个测试 -定义sqlContext.setConf("spark.sql.shuffle.partitions", ">765"( 中,处理和分区数结果为 765。即使 SO post 45704156 声明它可能不适用于 DF - 这是一个 DS。

这是你两个猜测的组合。

假设您有一组包含 M 分区的输入数据,并将随机分区设置为 N。

执行连接时,spark 会读取所有 M 分区中的输入数据,并根据 N 个分区的键重新洗牌数据。想象一个微不足道的哈希分区器,应用于键的哈希函数看起来很像 A = hashcode(key( % N,然后将这些数据重新分配给负责处理 Ath 分区的节点。每个节点可以负责处理多个分区。

洗牌后,节点将工作以聚合它们负责的分区中的数据。由于此处不需要进行额外的洗牌,因此节点可以直接生成输出。

因此,总而言之,您的输出将被合并为 N 个分区,但是它被合并是因为它是在 N 个分区中处理的,而不是因为 Spark 应用了一个额外的随机阶段来专门将输出数据重新分区到 N。

Spark.sql.shuffle.partitions是决定分区数量的参数,同时进行连接或聚合等随机操作,即数据在节点之间移动的位置。另一部分spark.default.parallelism将根据你的数据大小和最大块大小计算,在HDFS中它是128mb。因此,如果您的作业不执行任何随机操作,它将考虑默认的并行度值,或者如果您使用的是rdd,则可以自己设置它。当洗牌发生时,将需要 200。

Val df = sc.parallelize(List(1,2,3,4,5(,4(.toDF(( df.count((//这将使用 4 个分区

Val df1 = df df1.except(df(.count//将生成 200 个分区,具有 2 个阶段

最新更新