带有控制流的spark-sql查询topN



我有一个数据帧,如果它符合规则的要求,我想进行查询以获取数据,否则对数据进行重新排序并获取第一个数据。但我不知道该怎么做
数据帧是这样的,newtable

+--------------------------+--------------+-------+-------+-------------------------+
|_id                       |relatedID     |related|u      |pro                      |
+--------------------------+--------------+-------+-------+-------------------------+
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[name,100,yyj196,0.8]    |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[age,102,21,0.9]         |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[favorite,102,IT,0.7]    |
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[name,100,yyj196,0.8]    |
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[age,102,21,0.9]         |
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[favorite,102,IT,0.7]    |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[name,100,yyj2447005,0.5]|
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[age,101,21,0.5]         |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[favorite,102,iphone,0.5]|
+--------------------------+--------------+-------+-------+-------------------------+

它是从另外两个数据帧加入的

这是模式

root
|-- _id: struct (nullable = true)
|    |-- oid: string (nullable = true)
|-- relatedID: array (nullable = true)
|    |-- element: integer (containsNull = true)
|-- related: integer (nullable = true)
|-- u: integer (nullable = true)
|-- pro: struct (nullable = true)
|    |-- fieldID: string (nullable = true)
|    |-- sourceID: string (nullable = true)
|    |-- value: string (nullable = true)
|    |-- weight: double (nullable = true)

这是Scala 中的代码

//join  two  dataframe  & create tempview newtable
dfsU.join(dfsu,dfsU("related") === (dfsu("u")),"inner")
.createTempView("newtable")
//test  ,The data displayed above 
val checkdata =  spark.sql("select * from newtable  where  related = 196 or  related = 2447005 or  u = 196 or  u = 2447005 ")
checkdata.show(false)
checkdata.printSchema()
// group  && set  ranks 
spark.sql("select * ,Row_Number() OVER (partition by  _id , pro.fieldID  ORDER BY pro.weight desc) ranks FROM newtable")
.createTempView("tmpview")
//test  , get the  data  from temview 
spark.sql("select * from tmpview  where  related = 196 or  related = 2447005 or  u = 196 or  u = 2447005 ").show(false)

结果是这样的。看起来很奇怪。它不是由pro.weight订购的

+--------------------------+--------------+-------+-------+-------------------------+-----+
|_id                       |relatedID     |related|u      |pro                      |ranks|
+--------------------------+--------------+-------+-------+-------------------------+-----+
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[age,101,21,0.5]         |1    |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[favorite,102,iphone,0.5]|1    |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[name,100,yyj2447005,0.5]|1    |
+--------------------------+--------------+-------+-------+-------------------------+-----+

Q1:
如果是pro.weight最大值并按_idpro.field分组,如何获取数据。我的查询有什么问题
Q2:
我还需要在指定fieldID的特殊sourceId中获取数据
,例如获取[age,101,21,0.5]而不是[age,102,21,0.9],即使它在该组中的重量低于0.9。原因是sourceID == 101是优先级。

if(pro.fieldID == age  && pro.sourceID == 101 ){
//get this  data   when  the  field  is  `age`  and  the `sourceId`  fitted   get  this data 
//[age,101,21,0.5]
// other  field  also get the max  weight
// group  by  pro.fieldID , sorted  by  pro.weight  and  the  top one
//[name,100,yyj196,0.8]
//[favorite,102,IT,0.7]
}else {
//group  by  pro.fieldID , sorted  by  pro.weight  and  the  top one
//both  field  also get the max  weight
//[age,101,21,0.9]
//[name,100,yyj196,0.8]         
//[favorite,102,IT,0.7]
}

如何做到这一点
提前谢谢。

编辑
更多信息

val w = Window.partitionBy(tmp.col("_id"),tmp.col("pro.fieldID")).orderBy(functions.desc("pro.weight"))
tmp.where("related = 196 or  related = 2447005 or  u = 196 or  u = 2447005 ").withColumn("rn", functions.row_number().over(w)).show(false)
println("----------------------")
tmp.withColumn("rn", functions.row_number().over(w)).where("related = 196 or  related = 2447005 or  u = 196 or  u = 2447005 ").show(false)

为什么结果不同?,他们使用相同的数据,相同的"窗口功能">

数据格式

root
|-- _id: struct (nullable = true)
|    |-- oid: string (nullable = true)
|-- relatedID: array (nullable = true)
|    |-- element: integer (containsNull = true)
|-- related: integer (nullable = true)
+--------------------------+--------------+-------+-------+-------------------------+---+
|_id                       |relatedID     |related|u      |pro                      |rn |
+--------------------------+--------------+-------+-------+-------------------------+---+
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[age,101,21,0.9]         |1  |
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[name,100,yyj196,0.8]    |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[age,101,21,0.9]         |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[age,101,21,0.5]         |2  |
|[5c3f2de802353b0d870b05e0]|[196, 2542146]|196    |196    |[favorite,102,IT,0.7]    |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[favorite,102,IT,0.7]    |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[favorite,102,iphone,0.5]|2  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|196    |196    |[name,100,yyj196,0.8]    |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[name,100,yyj2447005,0.5]|2  |
+--------------------------+--------------+-------+-------+-------------------------+---+
----------------------
19/02/01 18:31:11 WARN BaseSessionStateBuilder$$anon$2: Max iterations (100) reached for batch Operator Optimizations
+--------------------------+--------------+-------+-------+-------------------------+---+
|_id                       |relatedID     |related|u      |pro                      |rn |
+--------------------------+--------------+-------+-------+-------------------------+---+
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[age,101,21,0.5]         |1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[favorite,102,iphone,0.5]|1  |
|[5c3f2dd302353b0d870a7d2f]|[196, 2447005]|2447005|2447005|[name,100,yyj2447005,0.5]|1  |
+--------------------------+--------------+-------+-------+-------------------------+---+

Q1:

不能保证在没有order by的情况下从有序视图中选择行会生成有序表。SQL数据库可以自由选择最合适的方法(从性能的角度来看)。

一般来说,我不建议对视图进行排序,原因有两个:第一个原因是您的错误的原因-您需要对事物进行两次排序,所以没有意义;第二,对过滤后的表进行排序更快,因为要排序的行更少。

Q2:

如果我理解正确,您希望交换一些行/列。您可以查看withColumn()或简单的map(),其中包含if语句,以转换满足某些条件的语句。

相关内容

  • 没有找到相关文章

最新更新