从SPARK SQL Windows功能中获得意外的结果



似乎Spark SQL窗口函数无法正常工作。我正在Hadoop集群中经营Spark Job,HDFS块大小为128 MB,并且火花版1.5 CDH 5.5

我的要求:

如果有多个具有相同data_rfe_id的记录,则根据最大seq_id和maxiumum service_id

进行单个记录

我看到在原始数据中有一些具有相同data_rfe_id和相同seq_id的记录,因此,我使用窗口函数应用了row_number,以便我可以使用row_num === 1

过滤记录

但是,当拥有庞大的数据集时,似乎无法正常工作。我看到使用了同样的rownumber。

为什么这样发生?

在将窗口函数应用到数据框架上之前,我需要重新填充吗?

我希望每个data_rfe_id

都有唯一的等级号码

我只想使用窗口函数来实现此目的。

 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions.rowNumber
 .....
scala> df.printSchema
root
 |-- transitional_key: string (nullable = true)
 |-- seq_id: string (nullable = true)
 |-- data_rfe_id: string (nullable = true)
 |-- service_id: string (nullable = true)
 |-- event_start_date_time: string (nullable = true)
 |-- event_id: string (nullable = true)

 val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id").desc,df("service_id").desc)
  val rankDF =df.withColumn("row_num",rowNumber.over(windowFunction))
  rankDF.select("data_rfe_id","seq_id","service_id","row_num").show(200,false)

预期结果:

 +------------------------------------+-----------------+-----------+-------+
  |data_rfe_id                         |seq_id           |service_id|row_num|
 +------------------------------------+-----------------+-----------+-------+
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826          |4039       |1     |
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821          |3356       |2     |
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802          |1857       |3     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2156       |1     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2103       |2     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2083       |3     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2082       |4     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2076       |5     |

实际结果我根据上述代码得到:

 +------------------------------------+-----------------+-----------+-------+
 |data_rfe_id                          |seq_id           |service_id|row_num|
 +------------------------------------+-----------------+-----------+-------+
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826          |4039       |1     |
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821          |3356       |1     |
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802          |1857       |1     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2156       |1     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2103       |1     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2083       |1     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2082       |1     |
  |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2076       |1     |

有人可以向我解释为什么我会得到这些意外的结果?我该如何解决?

基本上您要排名,在desc顺序中具有seq_id和service_id。与您需要的范围添加范围。等级可能对您有用。以下是代码的片段:

val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id"),df("service_id")).desc().rangeBetween(-MAXNUMBER,MAXNUMBER))
val rankDF =df.withColumn( "rank", rank().over(windowFunction) )

当您使用旧版本的Spark不知道它是否有效。WindowsPec存在问题,这里有参考

相关内容

  • 没有找到相关文章

最新更新