为什么collect_set聚合添加了交换运算符以加入桶形表格的查询



我正在使用Spark-2.2。我正在盖上火花的水库。我已经创建了一个桶形表,这是desc formatted my_bucketed_tbl输出:

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|              bundle|              string|   null|
|                 ifa|              string|   null|
|               date_|                date|   null|
|                hour|                 int|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|             default|       |
|               Table|             my_bucketed_tbl|
|               Owner|            zeppelin|       |
|             Created|Thu Dec 21 13:43:...|       |
|         Last Access|Thu Jan 01 00:00:...|       |
|                Type|            EXTERNAL|       |
|            Provider|                 orc|       |
|         Num Buckets|                  16|       |
|      Bucket Columns|             [`ifa`]|       |
|        Sort Columns|             [`ifa`]|       |
|    Table Properties|[transient_lastDd...|       |
|            Location|hdfs:/user/hive/w...|       |
|       Serde Library|org.apache.hadoop...|       |
|         InputFormat|org.apache.hadoop...|       |
|        OutputFormat|org.apache.hadoop...|       |
|  Storage Properties|[serialization.fo...|       |
+--------------------+--------------------+-------+

当我通过查询执行一个小组的解释时,我可以看到我们宽了交换阶段:

sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain
== Physical Plan ==
SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
+- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
   +- *Sort [ifa#932 ASC NULLS FIRST], false, 0
      +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string>

但是,当我用collect_set替换Spark的max函数时,我可以看到执行计划与非扣除表相同,表示交换阶段不保留:

sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by ifa").explain
== Physical Plan ==
ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 0)])
+- Exchange hashpartitioning(ifa#1010, 200)
   +- ObjectHashAggregate(keys=[ifa#1010], functions=[partial_collect_set(bundle#998, 0, 0)])
      +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string>

是否有我错过的任何配置,或者这是Spark Bucking目前的限制?

该问题在版本2.2.1中修复了。您可以在此处找到JIRA问题

最新更新