将列添加到 Spark 数据帧的最大值小于当前记录的值



我有一个类似于以下的Spark数据框架:

id  claim_id                 service_date                  status   product
123 10606134411906233408    2018-09-17T00:00:00.000+0000    PD      blue
123 10606147900401009928    2019-01-24T00:00:00.000+0000    PD      yellow
123 10606160940704723994    2019-05-23T00:00:00.000+0000    RV      yellow
123 10606171648203079553    2019-08-29T00:00:00.000+0000    RJ      blue
123 10606186611407311724    2020-01-13T00:00:00.000+0000    PD      blue

请原谅我没有粘贴任何代码,因为什么都不起作用。我想添加一个新的列与前一行的max(service_date),其中状态为PD,当前行的乘积=前一行的乘积。

这很容易通过关联子查询完成,但效率不高,而且在Spark中不可行,因为不支持非对等连接。还要注意,LAG将无法工作,因为我并不总是需要直接的前一个记录(并且偏移量将是动态的)。

预期的输出应该是这样的:

id  claim_id                 service_date                  status   product     previous_service_date
123 10606134411906233408    2018-09-17T00:00:00.000+0000    PD      blue
123 10606147900401009928    2019-01-24T00:00:00.000+0000    PD      yellow
123 10606160940704723994    2019-05-23T00:00:00.000+0000    RV      yellow      2019-01-24T00:00:00.000+0000
123 10606171648203079553    2019-08-29T00:00:00.000+0000    RJ      blue        2018-09-17T00:00:00.000+0000
123 10606186611407311724    2020-01-13T00:00:00.000+0000    PD      blue        2018-09-17T00:00:00.000+0000

您可以尝试以下使用max作为when(一个大小写表达式)的窗口函数,但侧重于前面的行

from pyspark.sql import functions as F
from pyspark.sql import Window

df = df.withColumn('previous_service_date',F.max(
F.when(F.col("status")=="PD",F.col("service_date")).otherwise(None)
).over(
Window.partitionBy("product")
.rowsBetween(Window.unboundedPreceding,-1)
))
df.orderBy('service_date').show(truncate=False)
+---+--------------------+-------------------+------+-------+---------------------+
|id |claim_id            |service_date       |status|product|previous_service_date|
+---+--------------------+-------------------+------+-------+---------------------+
|123|10606134411906233408|2018-09-17 00:00:00|PD    |blue   |null                 |
|123|10606147900401009928|2019-01-24 00:00:00|PD    |yellow |null                 |
|123|10606160940704723994|2019-05-23 00:00:00|RV    |yellow |2019-01-24 00:00:00  |
|123|10606171648203079553|2019-08-29 00:00:00|RJ    |blue   |2018-09-17 00:00:00  |
|123|10606186611407311724|2020-01-13 00:00:00|PD    |blue   |2018-09-17 00:00:00  |
+---+--------------------+-------------------+------+-------+---------------------+

编辑1

您也可以使用last,如下所示

df = df.withColumn('previous_service_date',F.last(
F.when(F.col("status")=="PD" ,F.col("service_date")).otherwise(None),True
).over(
Window.partitionBy("product")
.orderBy('service_date')
.rowsBetween(Window.unboundedPreceding,-1)
))

让我知道这是否适合你。

您可以将您的DataFramecopy为新的DataFrame (df2)和join,如下所示:

(df.join(df2, 
on = [df.Service_date > df2.Service_date,
df.product == df2.product,
df2.status == 'PD'],
how = "left"))

删除重复的列,并将df2.Service_date重命名为previous_service_date

相关内容

  • 没有找到相关文章

最新更新