我有一个如下所示的数据框架,
+-----+----------+---------+-------+-------------------+
|jobid|fieldmname|new_value|coltype| createat|
+-----+----------+---------+-------+-------------------+
| 1| jobstage| sttaus1| null|2022-10-10 12:11:34|
| 1| jobstatus| sttaus2| status|2022-10-10 13:11:34|
| 1| jobstage| sttaus3| null|2022-10-10 14:11:34|
| 1| jobstatus| sttaus4| null|2022-10-10 15:11:34|
| 1| jobstatus| sttaus10| status|2022-10-10 16:11:34|
| 1| jobstatus| sttaus11| null|2022-10-10 17:11:34|
| 2| jobstage| sttaus1| null|2022-10-11 10:11:34|
| 2| jobstatus| sttaus2| status|2022-11-11 12:11:34|
+-----+----------+---------+-------+-------------------+
Seq(
(1, "jobstage", "sttaus1", "null", "2022-10-10 12:11:34"),
(1, "jobstatus", "sttaus2", "status", "2022-10-10 13:11:34"),
(1, "jobstage", "sttaus3", "null", "2022-10-10 14:11:34"),
(1, "jobstatus", "sttaus4", "null", "2022-10-10 15:11:34"),
(1, "jobstatus", "sttaus10", "status", "2022-10-10 16:11:34"),
(1, "jobstatus", "sttaus11", null, "2022-10-10 17:11:34"),
(2, "jobstage", "sttaus1", "null", "2022-10-11 10:11:34"),
(2, "jobstatus", "sttaus2", "status", "2022-11-10 12:11:34")
).toDF("jobid", "fieldmname", "new_value", "coltype", "createat")
只需要为fieldmname为"jobstage"的行添加新列和值。该值应该是对应作业阶段的最新状态(签入下一行)。当选择最新的需要检查coltype值,如果它是&;status&;
预期dataframe:
+-----+----------+---------+-------+-------------------+-------------+
|jobid|fieldmname|new_value|coltype| createat|latest_status|
+-----+----------+---------+-------+-------------------+-------------+
| 1| jobstage| sttaus1| null|2022-10-10 12:11:34| sttaus2|
| 1| jobstatus| sttaus2| status|2022-10-10 13:11:34| |
| 1| jobstage| sttaus3| null|2022-10-10 14:11:34| sttaus10|
| 1| jobstatus| sttaus4| null|2022-10-10 15:11:34| |
| 1| jobstatus| sttaus10| status|2022-10-10 16:11:34| |
| 1| jobstatus| sttaus11| null|2022-10-10 17:11:34| |
| 2| jobstage| sttaus1| null|2022-10-11 10:11:34| sttaus2|
| 2| jobstatus| sttaus2| status|2022-11-11 12:11:34| |
+-----+----------+---------+-------+-------------------+-------------+
我尝试了lead, lag, row_number,但没有得到预期的结果。
这个问题被标记为pyspark,所以我正在编写一种使用first()
窗口函数在pyspark中执行所需操作的方法。
data_sdf.
withColumn('latest',
func.when(func.col('fieldmname') == 'jobstage',
func.first(func.when((func.col('coltype') == 'status') & (func.col('fieldmname') == 'jobstatus'), func.col('new_value')), ignorenulls=True).
over(wd.partitionBy('jobid').orderBy('createat').rowsBetween(0, sys.maxsize))
).
otherwise(func.lit(''))
).
show()
# +-----+----------+---------+-------+-------------------+--------+
# |jobid|fieldmname|new_value|coltype| createat| latest|
# +-----+----------+---------+-------+-------------------+--------+
# | 1| jobstage| sttaus1| null|2022-10-10 12:11:34| sttaus2|
# | 1| jobstatus| sttaus2| status|2022-10-10 13:11:34| |
# | 1| jobstage| sttaus3| null|2022-10-10 14:11:34|sttaus10|
# | 1| jobstatus| sttaus4| null|2022-10-10 15:11:34| |
# | 1| jobstatus| sttaus10| status|2022-10-10 16:11:34| |
# | 1| jobstatus| sttaus11| null|2022-10-10 17:11:34| |
# | 2| jobstage| sttaus1| null|2022-10-11 10:11:34| sttaus2|
# | 2| jobstatus| sttaus2| status|2022-11-10 12:11:34| |
# +-----+----------+---------+-------+-------------------+--------+
因此,它将考虑相应记录中的第一条记录,其中fieldmname
为"jobstatus"coltype
为"状态"。