蜂巢查询以找到中间数周的计数



我有一张类似于下面的表

id      week    count   
A100    201008  2    
A100    201009  9    
A100    201010  16    
A100    201011  23    
A100    201012  30    
A100    201013  36    
A100    201015  43    
A100    201017  50    
A100    201018  57    
A100    201019  63    
A100    201023  70    
A100    201024  82    
A100    201025  88    
A100    201026  95    
A100    201027  102

在这里,我们可以看到以下几周丢失:

  • 第一个201014缺少
  • 第二个201016缺失
  • 第三周失踪201020,201021,201022

我的要求是,每当我们缺少值时,我们需要显示上周的数量。

在这种情况下,输出应为:

id      week    count
A100    201008  2    
A100    201009  9    
A100    201010  16    
A100    201011  23    
A100    201012  30   
A100    201013  36    
A100    201014  36    
A100    201015  43    
A100    201016  43    
A100    201017  50    
A100    201018  57    
A100    201019  63    
A100    201020  63
A100    201021  63    
A100    201022  63    
A100    201023  70    
A100    201024  82    
A100    201025  88    
A100    201026  95    
A100    201027  102

我如何使用Hive/pyspark?

实现这一要求

尽管此答案在Scala中,但Python版本看起来几乎相同&可以很容易地转换。

步骤1:

在其之前找到缺少一周值的行。

样本输入:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
//sample input
val input = sc.parallelize(List(("A100",201008,2), ("A100",201009,9),("A100",201014,4), ("A100",201016,45))).toDF("id","week","count")
scala> input.show
+----+------+-----+
|  id|  week|count|
+----+------+-----+
|A100|201008|    2|
|A100|201009|    9|
|A100|201014|    4| //missing 4 rows
|A100|201016|   45| //missing 1 row
+----+------+-----+ 

为了找到它,我们可以在week上使用.lead()功能。并计算leadWeekweek之间的差异。差异不应> 1,如果是这样,则排行之前缺少。

val diffDF = input
  .withColumn("leadWeek", lead($"week", 1).over(Window.partitionBy($"id").orderBy($"week")))   // partitioning by id & computing lead()
  .withColumn("diff", ($"leadWeek" - $"week") -1)                                 // finding difference between leadWeek & week
scala> diffDF.show
+----+------+-----+--------+----+
|  id|  week|count|leadWeek|diff|
+----+------+-----+--------+----+
|A100|201008|    2|  201009|   0| // diff -> 0 represents that no rows needs to be added
|A100|201009|    9|  201014|   4| // diff -> 4 represents 4 rows are to be added after this row.
|A100|201014|    4|  201016|   1| // diff -> 1 represents 1 row to be added after this row.
|A100|201016|   45|    null|null|
+----+------+-----+--------+----+

步骤2:

  • 如果diff是> = 1:创建并添加n行数(InputWithDiff,检查下面的案例类(,则 diff并相应地增量week值。返回新的与原始行一起创建了行。
  • 如果差异为0,则不需要其他计算。返回原始行。

diffDF转换为数据集以易于计算。

case class InputWithDiff(id: Option[String], week: Option[Int], count: Option[Int], leadWeek: Option[Int], diff: Option[Int])
val diffDS = diffDF.as[InputWithDiff]
val output = diffDS.flatMap(x => {
 val diff = x.diff.getOrElse(0) 
 diff match {
  case n if n >= 1 => x :: (1 to diff).map(y => InputWithDiff(x.id, Some(x.week.get + y), x.count,x.leadWeek, x.diff)).toList  // create and append new Rows
  case _ => List(x)      // return as it is
 }
}).drop("leadWeek", "diff").toDF   // drop unnecessary columns & convert to DF

最终输出:

scala> output.show
+----+------+-----+
|  id|  week|count|
+----+------+-----+
|A100|201008|    2|
|A100|201009|    9|
|A100|201010|    9|
|A100|201011|    9|
|A100|201012|    9|
|A100|201013|    9|
|A100|201014|    4|
|A100|201015|    4|
|A100|201016|   45|
+----+------+-----+

pyspark解决方案

样本数据

df = spark.createDataFrame([(1,201901,10),
                            (1,201903,9),
                            (1,201904,21),
                            (1,201906,42),
                            (1,201909,3),
                            (1,201912,56)
                           ],['id','weeknum','val'])
df.show()
+---+-------+---+
| id|weeknum|val|
+---+-------+---+
|  1| 201901| 10|
|  1| 201903|  9|
|  1| 201904| 21|
|  1| 201906| 42|
|  1| 201909|  3|
|  1| 201912| 56|
+---+-------+---+

1(基本思想是用cross join创建所有ID和几周的组合(从最小值到最大值开始(。

from pyspark.sql.functions import min,max,sum,when
from pyspark.sql import Window
min_max_week = df.agg(min(df.weeknum),max(df.weeknum)).collect()
#Generate all weeks using range
all_weeks = spark.range(min_max_week[0][0],min_max_week[0][1]+1)
all_weeks = all_weeks.withColumnRenamed('id','weekno')
#all_weeks.show()
id_all_weeks = df.select(df.id).distinct().crossJoin(all_weeks).withColumnRenamed('id','aid')
#id_all_weeks.show()

2(此后,left join将原始数据框架添加到这些组合中有助于识别缺失的值。

res = id_all_weeks.join(df,(df.id == id_all_weeks.aid) & (df.weeknum == id_all_weeks.weekno),'left')
res.show()
+---+------+----+-------+----+
|aid|weekno|  id|weeknum| val|
+---+------+----+-------+----+
|  1|201911|null|   null|null|
|  1|201905|null|   null|null|
|  1|201903|   1| 201903|   9|
|  1|201904|   1| 201904|  21|
|  1|201901|   1| 201901|  10|
|  1|201906|   1| 201906|  42|
|  1|201908|null|   null|null|
|  1|201910|null|   null|null|
|  1|201912|   1| 201912|  56|
|  1|201907|null|   null|null|
|  1|201902|null|   null|null|
|  1|201909|   1| 201909|   3|
+---+------+----+-------+----+

3(然后,使用窗口函数的组合,sum->分配组和max->一旦分类,要填充缺失的值。

w1 = Window.partitionBy(res.aid).orderBy(res.weekno)
groups = res.withColumn("grp",sum(when(res.id.isNull(),0).otherwise(1)).over(w1))
w2 = Window.partitionBy(groups.aid,groups.grp)
missing_values_filled = groups.withColumn('filled',max(groups.val).over(w2)) #select required columns as needed
missing_values_filled.show() 
+---+------+----+-------+----+---+------+
|aid|weekno|  id|weeknum| val|grp|filled|
+---+------+----+-------+----+---+------+
|  1|201901|   1| 201901|  10|  1|    10|
|  1|201902|null|   null|null|  1|    10|
|  1|201903|   1| 201903|   9|  2|     9|
|  1|201904|   1| 201904|  21|  3|    21|
|  1|201905|null|   null|null|  3|    21|
|  1|201906|   1| 201906|  42|  4|    42|
|  1|201907|null|   null|null|  4|    42|
|  1|201908|null|   null|null|  4|    42|
|  1|201909|   1| 201909|   3|  5|     3|
|  1|201910|null|   null|null|  5|     3|
|  1|201911|null|   null|null|  5|     3|
|  1|201912|   1| 201912|  56|  6|    56|
+---+------+----+-------+----+---+------+

Hive查询与上述相同的逻辑(假设可以创建所有几周的表(

select id,weeknum,max(val) over(partition by id,grp) as val
from (select i.id
            ,w.weeknum
            ,t.val
            ,sum(case when t.id is null then 0 else 1 end) over(partition by i.id order by w.weeknum) as grp 
      from (select distinct id from tbl) i
      cross join weeks_table w
      left join tbl t on t.id = i.id and w.weeknum = t.weeknum
     ) t

最新更新