pyspark数据帧上的复杂逻辑,包括前一行的现有值以及动态生成的前一行值



我必须在spark数据帧或rdd(最好是数据帧(上应用一个逻辑,它需要生成两个额外的列。第一个生成列依赖于同一行的其他列,而第二个生成列则依赖于前一行的第一个生成的列。

以下是问题陈述的表格形式。A和B列在数据帧中可用。将生成C和D列。

A |  B   | C            |     D
------------------------------------
1 | 100  |  default val |    C1-B1
2 | 200  |  D1-C1       |    C2-B2
3 | 300  |  D2-C2       |    C3-B3
4 | 400  |  D3-C3       |    C4-B4
5 | 500  |  D4-C4       |    C5-B5

这是样本数据

A |  B   |    C   |   D
------------------------
1 | 100  |   1000 |  900
2 | 200  |  -100  | -300
3 | 300  |  -200  | -500
4 | 400  |  -300  | -700
5 | 500  |  -400  | -900

我能想到的唯一解决方案是将输入数据帧合并为1,将其转换为rdd,然后将python函数(具有所有计算逻辑(应用于mapPartitions API。然而,这种方法可能会在一个执行器上产生负载。

lag((函数可以帮助您:

import pyspark.sql.functions as F
from pyspark.sql.window import Window
w =  Window.orderBy("A")
df1 = df1.withColumn("C", F.lit(1000))
df2 = (
df1
.withColumn("D", F.col("C") - F.col("B"))
.withColumn("C", 
F.when(F.lag("C").over(w).isNotNull(), 
F.lag("D").over(w) - F.lag("C").over(w))
.otherwise(F.col("C")))
.withColumn("D", F.col("C") - F.col("B"))
)

数学上看,D1-C1,其中D1=C1-B1;因此D1-C1将变为C1-B1-C1=&gt-B1。在pyspark中,窗口函数有一个名为default的参数。这将简化您的问题。试试这个:

import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([(1,100),(2,200),(3,300),(4,400),(5,500)],['a','b'])
w=Window.orderBy('a')
df_lag =df.withColumn('c',F.lag((F.col('b')*-1),default=1000).over(w))
df_final = df_lag.withColumn('d',F.col('c')-F.col('b'))

结果:

df_final.show()
+---+---+----+----+
|  a|  b|   c|   d|
+---+---+----+----+
|  1|100|1000| 900|
|  2|200|-100|-300|
|  3|300|-200|-500|
|  4|400|-300|-700|
|  5|500|-400|-900|
+---+---+----+----+

如果运算不是减法,那么同样的逻辑也适用——用默认值填充C列——计算D,然后使用滞后计算C并重新计算D。

最新更新