在spark中使用Dataframe API查找最大平均值



我有一个表:

Country Emission
England 100
England 200
Germany 150
Germany 170

,我想计算每个国家的排放量平均值,然后找到一个平均值最大的国家(比如max(avg(Emission)))。我想打印计算值旁边的国家,如:Germany 160.

我知道我可以通过计算avg,按avg排序,然后打印第一行来做到这一点,但排序似乎不是最优的。我能承诺吗?

这是我写的排序解决方案:

spdf
.groupBy("Country")
.agg(f.avg("Emission").alias("Emission"))
.orderBy(f.desc("Emission"))
.limit(1)
.show()

,这是查询,它只返回max(avg(Emission))

spdf
.groupBy("Country")
.avg("Emission")
.agg(f.max("avg(Emission)").alias("Emission"))
.show()

这是真的,这是一个耻辱的排序,只是为了获得最大值。因此,您可以使用max函数。由于您希望保留国家的名称,而不仅仅是Emission的最大值,因此可以在struct中将这两个列绑定在一起。代码看起来像这样:

from pyspark.sql import functions as F
data=[("England", 100), ("England", 200), ("Germany", 150), ("Germany", 170)]
df = spark.createDataFrame(data, ["Country", "Emission"])
df.groupBy("Country")
.agg(F.avg("Emission").alias("Emission"))
.select(F.max(F.struct("Emission", "Country")).alias("s"))
.select("s.*")
.show()
+--------+-------+
|Emission|Country|
+--------+-------+
|   160.0|Germany|
+--------+-------+

第一个select选择排放量最大的国家。结果封装在一个名为"s"的结构体中。第二个select命令展开结果。

您也可以使用窗口函数代替group by。其思想也是构造一个结构体,获取该结构体的最大值并展开其元素。

df2 = df.selectExpr('struct(Country, avg(Emission) over (partition by Country) avg_emission) s') 
.selectExpr('max(s) s') 
.select('s.*')
df2.show()
+-------+------------+
|Country|avg_emission|
+-------+------------+
|Germany|       160.0|
+-------+------------+

最新更新