似乎Spark SQL窗口函数无法正常工作。我正在Hadoop集群中经营Spark Job,HDFS块大小为128 MB,并且火花版1.5 CDH 5.5
我的要求:
如果有多个具有相同data_rfe_id的记录,则根据最大seq_id和maxiumum service_id
进行单个记录我看到在原始数据中有一些具有相同data_rfe_id和相同seq_id的记录,因此,我使用窗口函数应用了row_number,以便我可以使用row_num === 1
过滤记录但是,当拥有庞大的数据集时,似乎无法正常工作。我看到使用了同样的rownumber。
为什么这样发生?
在将窗口函数应用到数据框架上之前,我需要重新填充吗?
我希望每个data_rfe_id
都有唯一的等级号码我只想使用窗口函数来实现此目的。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber
.....
scala> df.printSchema
root
|-- transitional_key: string (nullable = true)
|-- seq_id: string (nullable = true)
|-- data_rfe_id: string (nullable = true)
|-- service_id: string (nullable = true)
|-- event_start_date_time: string (nullable = true)
|-- event_id: string (nullable = true)
val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id").desc,df("service_id").desc)
val rankDF =df.withColumn("row_num",rowNumber.over(windowFunction))
rankDF.select("data_rfe_id","seq_id","service_id","row_num").show(200,false)
预期结果:
+------------------------------------+-----------------+-----------+-------+
|data_rfe_id |seq_id |service_id|row_num|
+------------------------------------+-----------------+-----------+-------+
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826 |4039 |1 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821 |3356 |2 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802 |1857 |3 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2156 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2103 |2 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2083 |3 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2082 |4 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2076 |5 |
实际结果我根据上述代码得到:
+------------------------------------+-----------------+-----------+-------+
|data_rfe_id |seq_id |service_id|row_num|
+------------------------------------+-----------------+-----------+-------+
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826 |4039 |1 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821 |3356 |1 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802 |1857 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2156 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2103 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2083 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2082 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2076 |1 |
有人可以向我解释为什么我会得到这些意外的结果?我该如何解决?
基本上您要排名,在desc顺序中具有seq_id和service_id。与您需要的范围添加范围。等级可能对您有用。以下是代码的片段:
val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id"),df("service_id")).desc().rangeBetween(-MAXNUMBER,MAXNUMBER))
val rankDF =df.withColumn( "rank", rank().over(windowFunction) )
当您使用旧版本的Spark不知道它是否有效。WindowsPec存在问题,这里有参考