我在Hive中有这样的视图:
id sequencenumber appname
242539622 1 A
242539622 2 A
242539622 3 A
242539622 4 B
242539622 5 B
242539622 6 C
242539622 7 D
242539622 8 D
242539622 9 D
242539622 10 B
242539622 11 B
242539622 12 D
242539622 13 D
242539622 14 F
我希望每个 id 都有以下视图:
id sequencenumber appname appname_c
242539622 1 A A
242539622 2 A A
242539622 3 A A
242539622 4 B B_1
242539622 5 B B_1
242539622 6 C C
242539622 7 D D_1
242539622 8 D D_1
242539622 9 D D_1
242539622 10 B B_2
242539622 11 B B_2
242539622 12 D D_2
242539622 13 D D_2
242539622 14 F F
或任何与此类似的内容,可以识别序列中给定事件的重新发生。
我的最终目标是计算在每组事件中花费的时间(如果您愿意,可以在马尔可夫建模的上下文中声明),同时考虑到是否存在任何循环。例如,在上面示例中花费在B_1上的时间可以与B_2非常相似。
在 Hive(链接)中搜索了窗口函数,但我认为它们无法像 R/Python 那样进行逐行比较。
使用 Hive 窗口函数的解决方案。我使用您的数据对其进行了测试,删除your_table
CTE 并改用您的表格。结果符合预期。
with your_table as (--remove this CTE, use your table instead
select stack(14,
'242539622', 1,'A',
'242539622', 2,'A',
'242539622', 3,'A',
'242539622', 4,'B',
'242539622', 5,'B',
'242539622', 6,'C',
'242539622', 7,'D',
'242539622', 8,'D',
'242539622', 9,'D',
'242539622',10,'B',
'242539622',11,'B',
'242539622',12,'D',
'242539622',13,'D',
'242539622',14,'F'
) as (id,sequencenumber,appname)
) --remove this CTE, use your table instead
select id,sequencenumber,appname,
case when sum(new_grp_flag) over(partition by id, group_name) = 1 then appname --only one group of consequent runs exists (like A)
else
nvl(concat(group_name, '_',
sum(new_grp_flag) over(partition by id, group_name order by sequencenumber) --rolling sum of new_group_flag
),appname)
end appname_c
from
(
select id,sequencenumber,appname,
case when appname=prev_appname or appname=next_appname then appname end group_name, --identify group of the same app
case when appname<>prev_appname or prev_appname is null then 1 end new_grp_flag --one 1 per each group
from
(
select id,sequencenumber,appname,
lag(appname) over(partition by id order by sequencenumber) prev_appname, --need these columns
lead(appname) over(partition by id order by sequencenumber) next_appname --to identify groups of records w same app
from your_table --replace with your table
)s
)s
order by id,sequencenumber
;
结果:
OK
id sequencenumber appname appname_c
242539622 1 A A
242539622 2 A A
242539622 3 A A
242539622 4 B B_1
242539622 5 B B_1
242539622 6 C C
242539622 7 D D_1
242539622 8 D D_1
242539622 9 D D_1
242539622 10 B B_2
242539622 11 B B_2
242539622 12 D D_2
242539622 13 D D_2
242539622 14 F F
Time taken: 232.319 seconds, Fetched: 14 row(s)
您需要执行 2 个窗口函数才能实现该结果。
使用 pyspark 并假设df
是您的数据帧:
from pyspark.sql import functions as F, Window
df.withColumn(
"fg",
F.lag("appname").over(Window.partitionBy("id").orderBy("sequencenumber)
).withColumn(
"fg",
F.when(
F.col("fg")==F.col("id"),
0
).otherwise(1)
).withColumn(
"fg",
F.sum("fg").over(Window.partitionBy("id", "appname"))
).show()