我有一个带有车辆ID、时间戳和里程表的数据框。 某些里程表读数可能为空。我想创建一个新列,它是每个车辆 ID 的时间戳的当前里程表,如果为 null,则使用以前的 none null 里程表。
例
+------------+------------------------+-----------+-------------------------+
|vehicleID |startDateTimeUtc |Odometer |NewColumn-CurrentOdometer|
+------------+------------------------+-----------+-------------------------+
|a |2019-04-11T16:27:32+0000|10000 |10000 |
|a |2019-04-11T16:27:32+0000|15000 |15000 |
|a |2019-04-11T16:43:10+0000|null |15000 |
|a |2019-04-11T20:13:52+0000|null |15000 |
|a |2019-04-12T14:50:35+0000|null |15000 |
|a |2019-04-12T18:53:19+0000|20000 |20000 |
|b |2019-04-12T19:06:41+0000|350000 |350000 |
|b |2019-04-12T19:17:15+0000|370000 |370000 |
|b |2019-04-12T19:30:32+0000|null |370000 |
|b |2019-04-12T20:19:41+0000|380000 |380000 |
|b |2019-04-12T20:42:26+0000|null |380000 |
我知道我需要使用窗口功能。我可能也需要使用"滞后",但是我如何查找不仅仅是以前的记录?(请参阅示例车辆 ID a( 非常感谢!
my_window = Window.partitionBy("vehicleID").orderBy("vehicleID","startDateTimeUtc")
使用带有ignoreNulls标志的last window function
作为True
,行之间unboundedPreceeding and currentRow
。
df.show(20,False)
#+---------+------------------------+--------+
#|vehicleid|startdatetimeutc |odometer|
#+---------+------------------------+--------+
#|a |2019-04-11T16:27:32+0000|10000 |
#|a |2019-04-11T16:27:32+0000|15000 |
#|a |2019-04-11T16:43:10+0000|null |
#|a |2019-04-11T20:13:52+0000|null |
#|a |2019-04-12T14:50:35+0000|null |
#|a |2019-04-12T18:53:19+0000|20000 |
#|b |2019-04-12T19:06:41+0000|350000 |
#|b |2019-04-12T19:17:15+0000|370000 |
#|b |2019-04-12T19:30:32+0000|null |
#|b |2019-04-12T20:19:41+0000|380000 |
#|b |2019-04-12T20:42:26+0000|null |
#+---------+------------------------+--------+
import sys
my_window = Window.partitionBy("vehicleID").orderBy("vehicleID","startDateTimeUtc").rowsBetween(-sys.maxsize,0)
df.withColumn("NewColumn-CurrentOdometer",last(col("Odometer"),True).over(my_window)).orderBy("vehicleid").show(20,False)
#+---------+------------------------+--------+-------------------------+
#|vehicleid|startdatetimeutc |odometer|NewColumn-CurrentOdometer|
#+---------+------------------------+--------+-------------------------+
#|a |2019-04-11T16:27:32+0000|10000 |10000 |
#|a |2019-04-11T16:27:32+0000|15000 |15000 |
#|a |2019-04-11T16:43:10+0000|null |15000 |
#|a |2019-04-11T20:13:52+0000|null |15000 |
#|a |2019-04-12T14:50:35+0000|null |15000 |
#|a |2019-04-12T18:53:19+0000|20000 |20000 |
#|b |2019-04-12T19:06:41+0000|350000 |350000 |
#|b |2019-04-12T19:17:15+0000|370000 |370000 |
#|b |2019-04-12T19:30:32+0000|null |370000 |
#|b |2019-04-12T20:19:41+0000|380000 |380000 |
#|b |2019-04-12T20:42:26+0000|null |380000 |
#+---------+------------------------+--------+-------------------------+
另一种选择 - 将max
与窗框unboundedpreceding and currentrow
一起使用
加载提供的测试数据
val data =
"""
|vehicleID |startDateTimeUtc |Odometer
|a |2019-04-11T16:27:32+0000|10000
|a |2019-04-11T16:27:32+0000|15000
|a |2019-04-11T16:43:10+0000|null
|a |2019-04-11T20:13:52+0000|null
|a |2019-04-12T14:50:35+0000|null
|a |2019-04-12T18:53:19+0000|20000
|b |2019-04-12T19:06:41+0000|350000
|b |2019-04-12T19:17:15+0000|370000
|b |2019-04-12T19:30:32+0000|null
|b |2019-04-12T20:19:41+0000|380000
|b |2019-04-12T20:42:26+0000|null
""".stripMargin
val stringDS1 = data.split(System.lineSeparator())
.map(_.split("\|").map(_.replaceAll("""^[ t]+|[ t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
/**
* +---------+------------------------+--------+
* |vehicleID|startDateTimeUtc |Odometer|
* +---------+------------------------+--------+
* |a |2019-04-11T16:27:32+0000|10000 |
* |a |2019-04-11T16:27:32+0000|15000 |
* |a |2019-04-11T16:43:10+0000|null |
* |a |2019-04-11T20:13:52+0000|null |
* |a |2019-04-12T14:50:35+0000|null |
* |a |2019-04-12T18:53:19+0000|20000 |
* |b |2019-04-12T19:06:41+0000|350000 |
* |b |2019-04-12T19:17:15+0000|370000 |
* |b |2019-04-12T19:30:32+0000|null |
* |b |2019-04-12T20:19:41+0000|380000 |
* |b |2019-04-12T20:42:26+0000|null |
* +---------+------------------------+--------+
*
* root
* |-- vehicleID: string (nullable = true)
* |-- startDateTimeUtc: string (nullable = true)
* |-- Odometer: integer (nullable = true)
*/
计算顺序为的窗口中的最大值
val w = Window.partitionBy("vehicleID").orderBy("startDateTimeUtc")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df1.withColumn("NewColumn-CurrentOdometer",
max("Odometer").over(w))
.show(false)
/**
* +---------+------------------------+--------+-------------------------+
* |vehicleID|startDateTimeUtc |Odometer|NewColumn-CurrentOdometer|
* +---------+------------------------+--------+-------------------------+
* |a |2019-04-11T16:27:32+0000|10000 |10000 |
* |a |2019-04-11T16:27:32+0000|15000 |15000 |
* |a |2019-04-11T16:43:10+0000|null |15000 |
* |a |2019-04-11T20:13:52+0000|null |15000 |
* |a |2019-04-12T14:50:35+0000|null |15000 |
* |a |2019-04-12T18:53:19+0000|20000 |20000 |
* |b |2019-04-12T19:06:41+0000|350000 |350000 |
* |b |2019-04-12T19:17:15+0000|370000 |370000 |
* |b |2019-04-12T19:30:32+0000|null |370000 |
* |b |2019-04-12T20:19:41+0000|380000 |380000 |
* |b |2019-04-12T20:42:26+0000|null |380000 |
* +---------+------------------------+--------+-------------------------+
*/