我有以下格式的apache spark dataframe
| ID | groupId | phaseName |
|----|-----------|-----------|
| 10 | someHash1 | PhaseA |
| 11 | someHash1 | PhaseB |
| 12 | someHash1 | PhaseB |
| 13 | someHash2 | PhaseX |
| 14 | someHash2 | PhaseY |
每行代表一个阶段,该阶段发生在一个由其中几个阶段组成的过程中。ID
列表示相位的顺序顺序,而groupId
列显示了哪个阶段属于一起。
我想在数据框中添加一个新列:以前的plosphaseName。本列应指示与同一过程的以前的不同阶段。一个过程的第一阶段(具有最小ID的过程)将以null
为上一阶段。当一个阶段发生两次或更长时间时,第二个(第三...)的发生将具有相同的先前重点:例如:
df =
| ID | groupId | phaseName | prevPhaseName |
|----|-----------|-----------|---------------|
| 10 | someHash1 | PhaseA | null |
| 11 | someHash1 | PhaseB | PhaseA |
| 12 | someHash1 | PhaseB | PhaseA |
| 13 | someHash2 | PhaseX | null |
| 14 | someHash2 | PhaseY | PhaseX |
我不确定如何实施。我的第一种方法是:
- 创建第二个空数据库DF2
- DF中的每一行:
找到使用groupId = row.groupid,id<row.id和最大ID - 将此行添加到DF2
- 加入DF1和DF2
使用窗口函数的部分解决方案
我使用Window Functions
来汇总上一个阶段的名称,该组中当前阶段的先前发生的数量(不一定在一行中)以及当前和先前的阶段名称是否相等:
WindowSpec windowSpecPrev = Window
.partitionBy(df.col("groupId"))
.orderBy(df.col("ID"));
WindowSpec windowSpecCount = Window
.partitionBy(df.col("groupId"), df.col("phaseName"))
.orderBy(df.col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
df
.withColumn("prevPhase", functions.lag("phaseName", 1).over(windowSpecPrev))
.withColumn("phaseCount", functions.count("phaseId").over(windowSpecCount))
.withColumn("prevSame", when(col("prevPhase").equalTo(col("phaseName")),1).otherwise(0))
df =
| ID | groupId | phaseName | prevPhase | phaseCount | prevSame |
|----|-----------|-----------|-------------|------------|----------|
| 10 | someHash1 | PhaseA | null | 1 | 0 |
| 11 | someHash1 | PhaseB | PhaseA | 1 | 0 |
| 12 | someHash1 | PhaseB | PhaseB | 2 | 1 |
| 13 | someHash2 | PhaseX | null | 1 | 0 |
| 14 | someHash2 | PhaseY | PhaseX | 1 | 0 |
这仍然不是我想实现的目标,但现在足够好
进一步的想法
要获得上一个不同阶段的名称,我看到了三种可能没有彻底调查的可能性:
- 实现自己的
lag
函数,该功能不会偏移,而是递归检查上一行,直到找到与给定线不同的值。(尽管我认为无法在Spark SQL中使用自己的分析窗口函数) - 找到一种根据
phaseCount
的值动态设置lag
功能的偏移的方法。(如果以前的同一假名未出现以单个序列出现,则可能会失败) - 在存储第一个给定输入的ID和phasename的窗口上使用
UserDefinedAggregateFunction
,并寻求具有不同phasename的最高ID。
我能够通过以下方式解决此问题:
- 获得(普通的)上一个阶段。
- 引入一个新的ID,该ID将按顺序分组进行分组。(在此答案的帮助下)。这采取了两个步骤。首先检查当前和上一个阶段名称是否相等,并相应地分配一个组值。第二个计算此值的累积总和。
- 将顺序组的第一行分配给其所有成员。
实施
WindowSpec specGroup = Window.partitionBy(col("groupId"))
.orderBy(col("ID"));
WindowSpec specSeqGroupId = Window.partitionBy(col("groupId"))
.orderBy(col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
WindowSpec specPrevDiff = Window.partitionBy(col("groupId"), col("seqGroupId"))
.orderBy(col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
df.withColumn("prevPhase", coalesce(lag("phaseName", 1).over(specGroup), lit("NO_PREV")))
.withColumn("seqCount", when(col("prevPhase").equalTo(col("phaseName")).or(col("prevPhase").equalTo("NO_PREV")),0).otherwise(1))
.withColumn("seqGroupId", sum("seqCount").over(specSeqGroupId))
.withColumn("prevDiff", first("prevPhase").over(specPrevDiff));
结果
df =
| ID | groupId | phaseName | prevPhase | seqCount | seqGroupId | prevDiff |
|----|-----------|-----------|-----------|----------|------------|----------|
| 10 | someHash1 | PhaseA | NO_PREV | 0 | 0 | NO_PREV |
| 11 | someHash1 | PhaseB | PhaseA | 1 | 1 | PhaseA |
| 12 | someHash1 | PhaseB | PhaseA | 0 | 1 | PhaseA |
| 13 | someHash2 | PhaseX | NO_PREV | 0 | 0 | NO_PREV |
| 14 | someHash2 | PhaseY | PhaseX | 1 | 1 | PhaseX |
任何建议,特别是在这些操作的效率方面。
我想您可以使用Spark Window(行帧)功能。检查API文档和以下帖子。
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html