当两个表的联接以相同的方式进行分段和排序时,Spark为什么要重新排序数据



我正在进行两个表的简单联接。

SELECT
a.user_id
FROM
table1 a
FULL OUTER JOIN
table2 b
ON a.user_id = b.user_id

table1table2都由user_id进行分块和排序,具有相同数量的分块,代码如下。

df.write
.mode("overwrite")
.format("parquet")
.bucketBy(4, "user_id")
.sortBy("user_id")
.option("path", "some/path/to/data/")
.option("compression", "snappy")
.saveAsTable("table1")

当我查看执行计划时,我看到Spark在FileScan之后仍然执行Sort步骤,我认为它不应该这样做。

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [user_id#4483L]
+- SortMergeJoin [user_id#4483L], [user_id#4485L], FullOuter
:- Sort [user_id#4483L ASC NULLS FIRST], false, 0
:  +- FileScan parquet default.table1[user_id#4483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://example/path_to_table1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint>, SelectedBucketsCount: 4 out of 4
+- Sort [user_id#4485L ASC NULLS FIRST], false, 0
+- FileScan parquet default.table2[user_id#4485L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://example/path_to_table2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint>, SelectedBucketsCount: 4 out of 4

以前,如果我不对表进行bucket和排序,那么执行计划也包括Exchange的步骤,所以通过bucketing,它现在消除了Exchange的步骤,这肯定很好,但我希望我也可以消除Sort的步骤。

谢谢。

答案:总结自Pradeep yadav的答案

我需要在saveAsTable()之前重新分配数据。

df.write
.repartition(4, col("user_id"))
.mode("overwrite")
.format("parquet")
.bucketBy(4, "user_id")
.sortBy("user_id")
.option("path", "some/path/to/data/")
.option("compression", "snappy")
.saveAsTable("table1")

您可以阅读这篇详细的文章:https://towardsdatascience.com/best-practices-for-bucketing-in-spark-sql-ea9f23f7dd53特别是那sort呢它应该能够回答您的查询。

最新更新