如何将窗口功能应用于pyspark上,而不是分组的数据,该数据需要在聚合中进行聚合



我有一个复杂的winodwing操作,我在pyspark中需要帮助。

我有一些由srcdest分组的数据,我需要为每个组做以下操作: - 仅在 socket2中选择数量的行,不出现在socket1中(对于此组中的所有行) - 应用该过滤标准后, amounts字段中的总和

amounts     src    dest    socket1   socket2
10          1        2           A       B
11          1        2           B        C
12           1        2          C       D
510          1       2          C       D
550          1        2          B       C  
500          1        2          A       B
80            1         3          A        B

我想以以下方式汇总:
512 10 = 522,而80是src = 1的唯一记录,dENT = 3

amounts     src    dest    
522          1        2      
80          1        3    

我从这里借了示例数据:如何在多列上编写pyspark udaf?

您可以将数据框架分成2个带有socket1的数据框架,另一个带有socket2,然后使用leftanti加入而不是过滤(适用于spark >= 2.0)。

首先,让我们创建数据框:

df = spark.createDataFrame(
    sc.parallelize([
        [10,1,2,"A","B"],
        [11,1,2,"B","C"],
        [12,1,2,"C","D"],
        [510,1,2,"C","D"],
        [550,1,2,"B","C"],
        [500,1,2,"A","B"],
        [80,1,3,"A","B"]
    ]), 
    ["amounts","src","dest","socket1","socket2"]
)

现在要拆分数据框:

spark> = 2.0

df1 = df.withColumnRenamed("socket1", "socket").drop("socket2")
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1")
res = df2.join(df1, ["src", "dest", "socket"], "leftanti")

Spark 1.6

df1 = df.withColumnRenamed("socket1", "socket").drop("socket2").withColumnRenamed("amounts", "amounts1")
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1")
res = df2.join(df1.alias("df1"), ["src", "dest", "socket"], "left").filter("amounts1 IS NULL").drop("amounts1")

最后是聚集:

import pyspark.sql.functions as psf
res.groupBy("src", "dest").agg(
    psf.sum("amounts").alias("amounts")
).show()
    +---+----+-------+
    |src|dest|amounts|
    +---+----+-------+
    |  1|   3|     80|
    |  1|   2|    522|
    +---+----+-------+

相关内容

  • 没有找到相关文章