如何基于ID列逐行(或逐窗口)更新spark数据框



我有一个DF与列:TXN_ID, SMO_ID, FLAG值:

12,340,null
12,56,null
12,353,null
13,340,null
13,56,null
13,353,null

问:按TXN_ID的升序排列DF。然后明智地更新事务id,而不是一次更新所有行。

1日更新:设置colFLAG为'Y'对SMO_ID的最小值(字符串列,因此按词法排序,即340将是最小的),其中FLAG为null或已经'Y'

2:更新完整DF中的所有其他行并设置FLAG='N',而SMO_ID在上面的步骤

中被更新为'Y'。这个场景很棘手,因为更新需要按照transaction_id的方式完成,而第二次更新取决于第一次更新的值。

我尝试使用forloop, foreach与withColumn,但这些都不是工作选项。另外,spark sql不支持update操作。

是否有办法实现这种状态更新?

输出看起来像:以TXN_ID=12为例,因为它应该首先按顺序更新。

update的第一部分看起来像

12,340,Y
12,353,null
12,56,null

现在,对于前面行中出现的其他SMO_ID=340, FLAG应该设置为n[此处完整的DF在范围内]

12,340,Y
12,353,null
12,56,null
13,340,N
13,56,null
13,353,null

同样,对于TXN_ID=13…

可以计算最小值,然后按自然顺序为每一行分配行数,并更改与最小值相关的行:

val df = Seq(
(12, "340", null.asInstanceOf[String]),
(12, "56", null.asInstanceOf[String]),
(12, "353", null.asInstanceOf[String]),
(13, "340", null.asInstanceOf[String]),
(13, "56", null.asInstanceOf[String]),
(13, "353", null.asInstanceOf[String])
).toDF("TXN_ID", "SMO_ID", "FLAG")
val minValue = df.select(min($"SMO_ID")).as(Encoders.STRING).first()
val smoIdWindow = Window.partitionBy("SMO_ID").orderBy("naturalOrder")
df
.withColumn("naturalOrder", monotonically_increasing_id())
.withColumn("rownum", row_number().over(smoIdWindow))
.withColumn("FLAG",
when($"SMO_ID" === lit(minValue),
when($"rownum" === 1, "Y").otherwise("N")
).otherwise($"FLAG")
)
.drop("naturalOrder","rownum")

输出:

+------+------+----+
|TXN_ID|SMO_ID|FLAG|
+------+------+----+
|12    |353   |null|
|13    |353   |null|
|12    |340   |Y   |
|13    |340   |N   |
|12    |56    |null|
|13    |56    |null|
+------+------+----+

最新更新