假设我有两个分区的数据框:
df1 = spark.createDataFrame(
[(x,x,x) for x in range(5)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
df2 = spark.createDataFrame(
[(x,x,x) for x in range(7)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
(方案1)如果我通过[key1,key2]加入它们的加入操作,则在每个分区中都执行无需洗牌的操作(结果dataFrame中的分区数是相同的):
x = df1.join(df2, on=['key1', 'key2'], how='left')
assert x.rdd.getNumPartitions() == 3
(方案2),如果我通过[key1,key2,time] shuffle操作联合它们(结果dataframe中的分区数为200,由spark.sql.shuffle驱动。分区选项):
x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
assert x.rdd.getNumPartitions() == 200
同时通过[key1,key2,time]保留隔板数量的Groupby和Window操作
x = df1.groupBy('key1', 'key2', 'time').agg(F.count('*'))
assert x.rdd.getNumPartitions() == 3
我不明白这是一个错误,还是在第二种情况下执行洗牌操作的一些原因?如果可能的话,我该如何避免洗牌?
我想能够找出python和scala的不同结果的原因。
原因是广播优化。如果Spark-Shell是从python和Scala均可起作用的,并且Scala的工作相同。
./spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1
val df1 = Seq(
(1, 1, 1)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))
val df2 = Seq(
(1, 1, 1),
(2, 2, 2)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))
val x = df1.join(df2, usingColumns = Seq("key1", "key2", "time"))
x.rdd.getNumPartitions == 200
因此,如 @user10938362所建议的,Spark 2.4.0无法从开箱即用的情况下优化所描述的情况。
顺便说一句。以下是有关编写催化剂优化器扩展的信息Div>
催化剂优化器的行为在Pyspark和Scala之间有所不同(至少使用Spark 2.4)。
我都跑了两个不同的计划。
的确,您在Pyspark中获得200个分区,除非您明确说明Pyspark:
spark.conf.set("spark.sql.shuffle.partitions", 3)
然后处理3个分区,因此在pyspark下保留了3个分区。
有些惊讶,因为我想在引擎盖下这很常见。所以人们一直在告诉我。它只是显示。
pyspark的物理计划,带有conf的参数集:
== Physical Plan ==
*(5) Project [key1#344L, key2#345L, time#346L]
+- SortMergeJoin [key1#344L, key2#345L, time#346L], [key1#350L, key2#351L, time#352L], LeftOuter
:- *(2) Sort [key1#344L ASC NULLS FIRST, key2#345L ASC NULLS FIRST, time#346L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key1#344L, key2#345L, time#346L, 3)
: +- *(1) Scan ExistingRDD[key1#344L,key2#345L,time#346L]
+- *(4) Sort [key1#350L ASC NULLS FIRST, key2#351L ASC NULLS FIRST, time#352L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key1#350L, key2#351L, time#352L, 3)
+- *(3) Filter ((isnotnull(key1#350L) && isnotnull(key2#351L)) && isnotnull(time#352L))
+- *(3) Scan ExistingRDD[key1#350L,key2#351L,time#352L]