Hive SQL 将查询应用于许多列



我对Hive SQL很陌生,我正在尝试将查询应用于许多列。下面是查询:

select good_at_name, cnt
, case when cnt <= char_perc[0] then 0
when cnt <= char_perc[1] then 1
when cnt <= char_perc[2] then 2
else 3
end as char_percentile_rank
from (
select good_at_name, cnt
, PERCENTILE(BIGINT(cnt),array(0.25, 0.5, 0.75, 1)) over () as c_perc 
from (
select good_at_name
, sum(cnt) as cnt             
from good_at_name_walmart
group by good_at_name            
) t1
) t2

基本上,此查询将基于现有列创建一个新列,并为每个数据值返回 4 组百分位数(%25、%50、%75 和 %100(。(统计中的第 1 季度、第 2 季度、第 3 季度、第 4 季度(。这是我的意见:

In [182]: data_set
Out[182]: 
c_1  C_2  ...  C_1000
0  2    3          2
1  1    1          1    
2  2    2          0    
3  2    5          1     
4  4    1          3      

因此,当我应用该查询时,我会得到:

In [182]: result
Out[182]: 
c_1  c_perc  C_2   ...   C_1000 
0  1    0       1             0     
1  2    1       1             1     
2  2    1       2             1     
3  2    1       3             2     
4  4    3       5             3     

它只返回一列的c_perc,如c_1.我正在寻找一种将此查询应用于所有列的方法,以获取此输出:

In [182]: result
Out[182]: 
c_1  c_perc1  C_2  c_perc2  ...  C_1000  c_perc1000
0  1      0      1      0            0         0
1  2      1      1      0            1         1
2  2      1      2      1            1         1
3  2      1      3      2            2         2
4  4      3      5      3            3         3  

任何在ScalaHiveSpark左右的实现将不胜感激。

有多种方法可以解决这个问题。在高层次上,可能是这样的:

  1. 将配置单元表加载到数据帧中
  2. 从数据帧中获取列df.schema.fields
  3. 对于每个字段,创建一个包含所有新列的新数据帧(必须重写 Hive 查询以激发 sql(

编辑:这是我一起扔的一个快速开始。抱歉,没有更多时间来充实它,但希望这说明了我的想法。

val schema = sql.types.StructType(List(
sql.types.StructField("lions", sql.types.IntegerType, nullable = false),
sql.types.StructField("tigers", sql.types.IntegerType, nullable = false),
sql.types.StructField("bears", sql.types.IntegerType, nullable = false)
))
val rows = Seq(
sql.Row(15,10,20),
sql.Row(20,20,30),
sql.Row(35,30,40),
sql.Row(40,40,50),
sql.Row(50,50,60)
)
val rdd = spark.sparkContext.parallelize(rows)
val df = spark.createDataFrame(rdd, schema)
df.createOrReplaceTempView("tbl")
val percentileDfs = df.schema.fields.map(field => {
spark.sql(s"select percentile(${field.name}, array(0.25, 0.5, 0.75, 1)) as ${field.name}_centiles from tbl")
})
percentileDfs.foreach(_.show())

输出:

+--------------------+
|      lions_centiles|
+--------------------+
|[20.0, 35.0, 40.0...|
+--------------------+ 
+--------------------+
|     tigers_centiles|
+--------------------+
|[20.0, 30.0, 40.0...|
+--------------------+
+--------------------+
|      bears_centiles|
+--------------------+
|[30.0, 40.0, 50.0...|
+--------------------+

看看我是如何遍历df.schema上的字段的?然后,您可能只是收集每个查询的输出,然后使用它来生成新列(查看 Spark 的withColumn(。

最新更新