我有一个带有以下架构的数据框:
root
|-- eventTimestamp: long
|-- trackingId: string
|-- voyageStatus: string
这是一些样本行:
+--------------+----------+------------+
|eventTimestamp|trackingId|voyageStatus|
+--------------+----------+------------+
| 504 |78911c81 |COMPLETE |
| 504 |3b77a150 |ACTIVE |
| 390 |ece6c8d0 |ACTIVE |
| 390 |78911c81 |ACTIVE |
| 349 |3b77a150 |ACTIVE |
| 349 |ece6c8d0 |ACTIVE |
| 349 |78911c81 |ACTIVE |
| 350 |3b77a150 |ACTIVE |
| 350 |ece6c8d0 |ACTIVE |
| 350 |78911c81 |ACTIVE |
| 351 |3b77a150 |ACTIVE |
| 351 |ece6c8d0 |ACTIVE |
| 351 |78911c81 |ACTIVE |
| 352 |3b77a150 |ACTIVE |
| 352 |ece6c8d0 |ACTIVE |
| 352 |78911c81 |ACTIVE |
| 507 |3b77a150 |COMPLETE |
| 349 |ece6c8d0 |ACTIVE |
| 349 |78911c81 |ACTIVE |
| 349 |3b77a150 |ACTIVE |
+--------------+----------+------------+
我想添加一个名为 completionEventTimestamp
的类型的新列。对于每一行,此列将具有以下值:
- 如果存在与当前行相同的
voyageStatus
值的记录,则CC_3等于"COMPLETE"
,则该值将是该记录的eventTimestamp
。 - 否则,该值将为-1(因此可以稍后滤除该行)。
这是上述示例的转换所产生的:
+--------------+----------+------------+------------------------+
|eventTimestamp|trackingId|voyageStatus|completionEventTimestamp|
+--------------+----------+------------+------------------------+
| 504 |78911c81 |COMPLETE | 504|
| 504 |3b77a150 |ACTIVE | 507|
| 390 |ece6c8d0 |ACTIVE | -1|
| 390 |78911c81 |ACTIVE | 504|
| 349 |3b77a150 |ACTIVE | 507|
| 349 |ece6c8d0 |ACTIVE | -1|
| 349 |78911c81 |ACTIVE | 504|
| 350 |3b77a150 |ACTIVE | 507|
| 350 |ece6c8d0 |ACTIVE | -1|
| 350 |78911c81 |ACTIVE | 504|
| 351 |3b77a150 |ACTIVE | 507|
| 351 |ece6c8d0 |ACTIVE | -1|
| 351 |78911c81 |ACTIVE | 504|
| 352 |3b77a150 |ACTIVE | 507|
| 352 |ece6c8d0 |ACTIVE | -1|
| 352 |78911c81 |ACTIVE | 504|
| 507 |3b77a150 |COMPLETE | 507|
| 349 |ece6c8d0 |ACTIVE | -1|
| 349 |78911c81 |ACTIVE | 504|
| 349 |3b77a150 |ACTIVE | 507|
+--------------+----------+------------+------------------------+
如果这完全有帮助,如果给定trackingId
的记录具有"COMPLETE"
的voyageStatus
,则将是trackingId
的最后一个记录(如果您要通过eventTimestamp
订购),并且只会有一个这样的记录记录。
val completedVoyagesDF = training3.filter(training3("voyageStatus") === "COMPLETED").select("trackingID", "statusTimestamp")
val completedVoyagesArray = completedVoyagesDF.collect().map({
row: Row => row.getString(0) -> row.getLong(1)
})
val trackingIDToActualArrivalTime = completedVoyagesArray.toMap
val arrivalTime: (String => Long) = (trackingId: String) => {
trackingIDToActualArrivalTime.getOrElse(trackingId, -1)
}
val arrivalTimeFunc = udf(arrivalTime)
val withActualArrivalTimeDF = training3.withColumn(LABEL_COL_NAME, arrivalTimeFunc(col("trackingId")))
val training4 = withActualArrivalTimeDF.filter(withActualArrivalTimeDF(LABEL_COL_NAME) =!= -1)
您可以在window
分区上使用collect_list
来保存每个trackingId
的状态列表和UDF
,以将值分配给completionEventTimestamp
,如下所示:
val df = Seq(
(504L, 10, "ACTIVE"),
(506L, 10, "ACTIVE"),
(510L, 10, "COMPLETE"),
(390L, 11, "ACTIVE"),
(395L, 11, "ACTIVE"),
(398L, 11, "ACTIVE"),
(352L, 12, "ACTIVE"),
(360L, 12, "COMPLETE")
).toDF("eventTimestamp", "trackingId", "voyageStatus")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
// Save "completeTimestamp" for every row with COMPLETE status
val df2 = df.select(
$"eventTimestamp".as("completeTimestamp"), $"trackingId"
).where(df("voyageStatus") === "COMPLETE")
// Create a "statusList" per "trackingId" for each row using collect_list over window partitions
val window = Window.partitionBy("trackingId")
val df3 = df.withColumn("statusList", collect_list("voyageStatus").over(window))
// A UDF to check whether statusList contains "COMPLETE"
val checkComplete = udf(
(l: Seq[String]) => l.contains("COMPLETE")
)
// Join df3 with df2 and apply the UDF to assemble "completionEventTimestamp"
val df4 = df3.join(df2, Seq("trackingId"), "left_outer").
withColumn(
"completionEventTimestamp",
when(checkComplete($"statusList"), $"completeTimestamp").otherwise(-1L)
).select(
"eventTimestamp", "trackingId", "voyageStatus", "completionEventTimestamp"
)
df4.show
+--------------+----------+------------+------------------------+
|eventTimestamp|trackingId|voyageStatus|completionEventTimestamp|
+--------------+----------+------------+------------------------+
| 352| 12| ACTIVE| 360|
| 360| 12| COMPLETE| 360|
| 504| 10| ACTIVE| 510|
| 506| 10| ACTIVE| 510|
| 510| 10| COMPLETE| 510|
| 390| 11| ACTIVE| -1|
| 395| 11| ACTIVE| -1|
| 398| 11| ACTIVE| -1|
+--------------+----------+------------+------------------------+