PYSpark 通过窗口迭代计算累积最大值



我有一个带有车辆ID、时间戳和里程表的数据框。 某些里程表读数可能为空。我想创建一个新列,它是每个车辆 ID 的时间戳的当前里程表,如果为 null,则使用以前的 none null 里程表。

+------------+------------------------+-----------+-------------------------+
|vehicleID   |startDateTimeUtc        |Odometer   |NewColumn-CurrentOdometer|
+------------+------------------------+-----------+-------------------------+
|a           |2019-04-11T16:27:32+0000|10000      |10000                    |
|a           |2019-04-11T16:27:32+0000|15000      |15000                    |
|a           |2019-04-11T16:43:10+0000|null       |15000                    |
|a           |2019-04-11T20:13:52+0000|null       |15000                    |
|a           |2019-04-12T14:50:35+0000|null       |15000                    |
|a           |2019-04-12T18:53:19+0000|20000      |20000                    |
|b           |2019-04-12T19:06:41+0000|350000     |350000                   |
|b           |2019-04-12T19:17:15+0000|370000     |370000                   |
|b           |2019-04-12T19:30:32+0000|null       |370000                   |
|b           |2019-04-12T20:19:41+0000|380000     |380000                   |
|b           |2019-04-12T20:42:26+0000|null       |380000                   |

我知道我需要使用窗口功能。我可能也需要使用"滞后",但是我如何查找不仅仅是以前的记录?(请参阅示例车辆 ID a( 非常感谢!

my_window = Window.partitionBy("vehicleID").orderBy("vehicleID","startDateTimeUtc")

使用带有ignoreNulls标志的last window function作为True,行之间unboundedPreceeding and currentRow

df.show(20,False)
#+---------+------------------------+--------+
#|vehicleid|startdatetimeutc        |odometer|
#+---------+------------------------+--------+
#|a        |2019-04-11T16:27:32+0000|10000   |
#|a        |2019-04-11T16:27:32+0000|15000   |
#|a        |2019-04-11T16:43:10+0000|null    |
#|a        |2019-04-11T20:13:52+0000|null    |
#|a        |2019-04-12T14:50:35+0000|null    |
#|a        |2019-04-12T18:53:19+0000|20000   |
#|b        |2019-04-12T19:06:41+0000|350000  |
#|b        |2019-04-12T19:17:15+0000|370000  |
#|b        |2019-04-12T19:30:32+0000|null    |
#|b        |2019-04-12T20:19:41+0000|380000  |
#|b        |2019-04-12T20:42:26+0000|null    |
#+---------+------------------------+--------+
import sys
my_window = Window.partitionBy("vehicleID").orderBy("vehicleID","startDateTimeUtc").rowsBetween(-sys.maxsize,0)
df.withColumn("NewColumn-CurrentOdometer",last(col("Odometer"),True).over(my_window)).orderBy("vehicleid").show(20,False)
#+---------+------------------------+--------+-------------------------+
#|vehicleid|startdatetimeutc        |odometer|NewColumn-CurrentOdometer|
#+---------+------------------------+--------+-------------------------+
#|a        |2019-04-11T16:27:32+0000|10000   |10000                    |
#|a        |2019-04-11T16:27:32+0000|15000   |15000                    |
#|a        |2019-04-11T16:43:10+0000|null    |15000                    |
#|a        |2019-04-11T20:13:52+0000|null    |15000                    |
#|a        |2019-04-12T14:50:35+0000|null    |15000                    |
#|a        |2019-04-12T18:53:19+0000|20000   |20000                    |
#|b        |2019-04-12T19:06:41+0000|350000  |350000                   |
#|b        |2019-04-12T19:17:15+0000|370000  |370000                   |
#|b        |2019-04-12T19:30:32+0000|null    |370000                   |
#|b        |2019-04-12T20:19:41+0000|380000  |380000                   |
#|b        |2019-04-12T20:42:26+0000|null    |380000                   |
#+---------+------------------------+--------+-------------------------+

另一种选择 - 将max与窗框unboundedpreceding and currentrow一起使用

加载提供的测试数据

val data =
"""
|vehicleID   |startDateTimeUtc        |Odometer
|a           |2019-04-11T16:27:32+0000|10000
|a           |2019-04-11T16:27:32+0000|15000
|a           |2019-04-11T16:43:10+0000|null
|a           |2019-04-11T20:13:52+0000|null
|a           |2019-04-12T14:50:35+0000|null
|a           |2019-04-12T18:53:19+0000|20000
|b           |2019-04-12T19:06:41+0000|350000
|b           |2019-04-12T19:17:15+0000|370000
|b           |2019-04-12T19:30:32+0000|null
|b           |2019-04-12T20:19:41+0000|380000
|b           |2019-04-12T20:42:26+0000|null
""".stripMargin
val stringDS1 = data.split(System.lineSeparator())
.map(_.split("\|").map(_.replaceAll("""^[ t]+|[ t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
/**
* +---------+------------------------+--------+
* |vehicleID|startDateTimeUtc        |Odometer|
* +---------+------------------------+--------+
* |a        |2019-04-11T16:27:32+0000|10000   |
* |a        |2019-04-11T16:27:32+0000|15000   |
* |a        |2019-04-11T16:43:10+0000|null    |
* |a        |2019-04-11T20:13:52+0000|null    |
* |a        |2019-04-12T14:50:35+0000|null    |
* |a        |2019-04-12T18:53:19+0000|20000   |
* |b        |2019-04-12T19:06:41+0000|350000  |
* |b        |2019-04-12T19:17:15+0000|370000  |
* |b        |2019-04-12T19:30:32+0000|null    |
* |b        |2019-04-12T20:19:41+0000|380000  |
* |b        |2019-04-12T20:42:26+0000|null    |
* +---------+------------------------+--------+
*
* root
* |-- vehicleID: string (nullable = true)
* |-- startDateTimeUtc: string (nullable = true)
* |-- Odometer: integer (nullable = true)
*/

计算顺序为的窗口中的最大值

val w = Window.partitionBy("vehicleID").orderBy("startDateTimeUtc")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df1.withColumn("NewColumn-CurrentOdometer",
max("Odometer").over(w))
.show(false)
/**
* +---------+------------------------+--------+-------------------------+
* |vehicleID|startDateTimeUtc        |Odometer|NewColumn-CurrentOdometer|
* +---------+------------------------+--------+-------------------------+
* |a        |2019-04-11T16:27:32+0000|10000   |10000                    |
* |a        |2019-04-11T16:27:32+0000|15000   |15000                    |
* |a        |2019-04-11T16:43:10+0000|null    |15000                    |
* |a        |2019-04-11T20:13:52+0000|null    |15000                    |
* |a        |2019-04-12T14:50:35+0000|null    |15000                    |
* |a        |2019-04-12T18:53:19+0000|20000   |20000                    |
* |b        |2019-04-12T19:06:41+0000|350000  |350000                   |
* |b        |2019-04-12T19:17:15+0000|370000  |370000                   |
* |b        |2019-04-12T19:30:32+0000|null    |370000                   |
* |b        |2019-04-12T20:19:41+0000|380000  |380000                   |
* |b        |2019-04-12T20:42:26+0000|null    |380000                   |
* +---------+------------------------+--------+-------------------------+
*/

最新更新