Pyspark SQL查询以获取特定列的 /- 20%的行



我有以下pyspark df:

+------------------+--------+-------+
|                ID|  Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944|  32850|
|201542399349300629| 3979760| 850914|
|201542399349300634| 3402687|1983568|
|201542399349300724| 1138291|1097553|
|201522369349300122| 1401406|1010828|
|201522369349300137|   16948| 171534|
|201522369349300142|13474056|2285323|
|201522369349300202|  481045| 241788|
|201522369349300207|  700861|1185640|
|201522369349300227|  178479| 267976|
+------------------+--------+-------+

对于每一行,我希望能够获得资产金额20%以内的行。例如,对于第一行(ID = 201542399349300619),我希望能够获得资产在20%/-的1,633,944以内的所有行(因此,在1,307,155至1,960,732之间)

+------------------+--------+-------+
|                ID|  Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944|  32850|
|201522369349300122| 1401406|1010828|

使用此子集表,我想获取平均资产并将其添加为新列。因此,在上面的示例中,它将是(1633944 1401406)的平均资产= 1517675

+------------------+--------+-------+---------+
|                ID|  Assets|Revenue|AvgAssets|
+------------------+--------+-------+---------+
|201542399349300619| 1633944|  32850|  1517675|

假设您的dataframe具有类似于以下数据的架构(即AssetsRevenue是数字):

df.printSchema()
#root
# |-- ID: long (nullable = true)
# |-- Assets: integer (nullable = true)
# |-- Revenue: integer (nullable = true)

您可以在您提出的条件下加入数据框架。加入后,您可以通过占用Assets列的平均值进行分组和汇总。

例如:

from pyspark.sql.functions import avg, expr
df.alias("l")
    .join(
        df.alias("r"), 
        on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
    )
    .groupBy("l.ID", "l.Assets", "l.Revenue")
    .agg(avg("r.Assets").alias("AvgAssets"))
    .show()
#+------------------+--------+-------+------------------+
#|                ID|  Assets|Revenue|         AvgAssets|
#+------------------+--------+-------+------------------+
#|201542399349300629| 3979760| 850914|         3691223.5|
#|201522369349300202|  481045| 241788|          481045.0|
#|201522369349300207|  700861|1185640|          700861.0|
#|201522369349300137|   16948| 171534|           16948.0|
#|201522369349300142|13474056|2285323|       1.3474056E7|
#|201522369349300227|  178479| 267976|          178479.0|
#|201542399349300619| 1633944|  32850|         1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553|         1138291.0|
#|201542399349300634| 3402687|1983568|         3691223.5|
#+------------------+--------+-------+------------------+

由于我们将数据框架加入本身,因此我们可以使用别名参考左表("l")和右表("r")。上面的逻辑说将l加入r,条件是r中的资产是l中资产的 /20%。

有多种表达 /20%条件的方法,但是我使用SPARK-SQL between表达式来查找Assets * 0.8Assets * 1.2之间的行。

然后,我们在左表的所有列(groupBy)上汇总,并在右表中的资产上平均。

生成的AvgAssets列是FloatType列,但是如果您喜欢的话,您可以通过在.alias("AvgAssets")之前添加.cast("int")来轻松将其转换为IntegerType


另请参见:

  • Spark中的各种连接类型是什么?

相关内容

  • 没有找到相关文章

最新更新