我的问题:为什么Spark计算每个分区的sum
和count
,进行不必要的(恕我直言(洗牌(Exchange hashpartitioning
(,然后计算HashAggregate
的平均值?
可以做什么:计算每个分区的平均值,然后合并(并集(结果。
详:
我正在从下面定义的 Hive 表中读取数据,该表按日期分区。
spark.sql("""Create External Table If Not Exists daily_temp.daily_temp_2014
(
state_name string,
...
) Partitioned By (
date_local string
)
Location "./daily_temp/"
Stored As ORC""")
它包括从EPA网站下载的美国各个点的每日温度测量。
使用以下代码,将数据从Hive表加载到PySpark DataFrame中:
spark = (
SparkSession.builder
.master("local")
.appName("Hive Partition Test")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.getOrCreate()
)
my_df = spark.sql("select * from daily_temp.daily_temp_2014")
我想计算每个州的每日平均温度。
daily_state_mean = (
my_df
.groupBy(
my_df.date_local,
my_df.state_name
)
.agg({"arithmetic_mean":"mean"})
)
这是物理(执行(计划的一部分:
+- *(2) HashAggregate(keys=[date_local#3003, state_name#2998], functions=[avg(cast(arithmetic_mean#2990 as double))], output=[date_local#3003, state_name#2998, avg(CAST(arithmetic_mean AS DOUBLE))#3014])
+- Exchange hashpartitioning(date_local#3003, state_name#2998, 365)
+- *(1) HashAggregate(keys=[date_local#3003, state_name#2998], functions=[partial_avg(cast(arithmetic_mean#2990 as double))], output=[date_local#3003, state_name#2998, sum#3021, count#3022L])
+- HiveTableScan [arithmetic_mean#2990, state_name#2998, date_local#3003], HiveTableRelation `daily_temp`.`daily_temp_2014`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [...], [date_local#3003]
非常感谢您的建议和见解。
这里没有什么意外。Spark SQL尚未保留外部源的分区信息。
如果要优化随机播放,则必须CLUSTER BY
/bucketBy
数据。如果这样做,分区信息将用于优化随机播放。
参考 如何定义数据帧的分区?