在数据框中添加一个新列,该列的值基于下一行的值



我有一个如下所示的数据框架,

+-----+----------+---------+-------+-------------------+
|jobid|fieldmname|new_value|coltype|           createat|
+-----+----------+---------+-------+-------------------+
|    1|  jobstage|  sttaus1|   null|2022-10-10 12:11:34|
|    1| jobstatus|  sttaus2| status|2022-10-10 13:11:34|
|    1|  jobstage|  sttaus3|   null|2022-10-10 14:11:34|
|    1| jobstatus|  sttaus4|   null|2022-10-10 15:11:34|
|    1| jobstatus| sttaus10| status|2022-10-10 16:11:34|
|    1| jobstatus| sttaus11|   null|2022-10-10 17:11:34|
|    2|  jobstage|  sttaus1|   null|2022-10-11 10:11:34|
|    2| jobstatus|  sttaus2| status|2022-11-11 12:11:34|
+-----+----------+---------+-------+-------------------+
Seq(
(1, "jobstage", "sttaus1", "null", "2022-10-10 12:11:34"),
(1, "jobstatus", "sttaus2", "status", "2022-10-10 13:11:34"),
(1, "jobstage", "sttaus3", "null", "2022-10-10 14:11:34"),
(1, "jobstatus", "sttaus4", "null", "2022-10-10 15:11:34"),
(1, "jobstatus", "sttaus10", "status", "2022-10-10 16:11:34"),
(1, "jobstatus", "sttaus11", null, "2022-10-10 17:11:34"),
(2, "jobstage", "sttaus1", "null", "2022-10-11 10:11:34"),
(2, "jobstatus", "sttaus2", "status", "2022-11-10 12:11:34")
).toDF("jobid", "fieldmname", "new_value", "coltype", "createat")

只需要为fieldmname为"jobstage"的行添加新列和值。该值应该是对应作业阶段的最新状态(签入下一行)。当选择最新的需要检查coltype值,如果它是&;status&;

预期dataframe:

+-----+----------+---------+-------+-------------------+-------------+
|jobid|fieldmname|new_value|coltype|           createat|latest_status|
+-----+----------+---------+-------+-------------------+-------------+
|    1|  jobstage|  sttaus1|   null|2022-10-10 12:11:34|      sttaus2|
|    1| jobstatus|  sttaus2| status|2022-10-10 13:11:34|             |
|    1|  jobstage|  sttaus3|   null|2022-10-10 14:11:34|     sttaus10|
|    1| jobstatus|  sttaus4|   null|2022-10-10 15:11:34|             |
|    1| jobstatus| sttaus10| status|2022-10-10 16:11:34|             |
|    1| jobstatus| sttaus11|   null|2022-10-10 17:11:34|             |
|    2|  jobstage|  sttaus1|   null|2022-10-11 10:11:34|      sttaus2|
|    2| jobstatus|  sttaus2| status|2022-11-11 12:11:34|             |
+-----+----------+---------+-------+-------------------+-------------+

我尝试了lead, lag, row_number,但没有得到预期的结果。

这个问题被标记为pyspark,所以我正在编写一种使用first()窗口函数在pyspark中执行所需操作的方法。

data_sdf. 
withColumn('latest',
func.when(func.col('fieldmname') == 'jobstage', 
func.first(func.when((func.col('coltype') == 'status') & (func.col('fieldmname') == 'jobstatus'), func.col('new_value')), ignorenulls=True).
over(wd.partitionBy('jobid').orderBy('createat').rowsBetween(0, sys.maxsize))
).
otherwise(func.lit(''))
). 
show()
# +-----+----------+---------+-------+-------------------+--------+
# |jobid|fieldmname|new_value|coltype|           createat|  latest|
# +-----+----------+---------+-------+-------------------+--------+
# |    1|  jobstage|  sttaus1|   null|2022-10-10 12:11:34| sttaus2|
# |    1| jobstatus|  sttaus2| status|2022-10-10 13:11:34|        |
# |    1|  jobstage|  sttaus3|   null|2022-10-10 14:11:34|sttaus10|
# |    1| jobstatus|  sttaus4|   null|2022-10-10 15:11:34|        |
# |    1| jobstatus| sttaus10| status|2022-10-10 16:11:34|        |
# |    1| jobstatus| sttaus11|   null|2022-10-10 17:11:34|        |
# |    2|  jobstage|  sttaus1|   null|2022-10-11 10:11:34| sttaus2|
# |    2| jobstatus|  sttaus2| status|2022-11-10 12:11:34|        |
# +-----+----------+---------+-------+-------------------+--------+

因此,它将考虑相应记录中的第一条记录,其中fieldmname为"jobstatus"coltype为"状态"。

相关内容

  • 没有找到相关文章

最新更新