火花数据帧转换



我有一个带有以下架构的数据框:

root
 |-- eventTimestamp: long 
 |-- trackingId: string 
 |-- voyageStatus: string 

这是一些样本行:

+--------------+----------+------------+
|eventTimestamp|trackingId|voyageStatus|
+--------------+----------+------------+
|          504 |78911c81  |COMPLETE    |
|          504 |3b77a150  |ACTIVE      |
|          390 |ece6c8d0  |ACTIVE      |
|          390 |78911c81  |ACTIVE      |
|          349 |3b77a150  |ACTIVE      |
|          349 |ece6c8d0  |ACTIVE      |
|          349 |78911c81  |ACTIVE      |
|          350 |3b77a150  |ACTIVE      |
|          350 |ece6c8d0  |ACTIVE      |
|          350 |78911c81  |ACTIVE      |
|          351 |3b77a150  |ACTIVE      |
|          351 |ece6c8d0  |ACTIVE      |
|          351 |78911c81  |ACTIVE      |
|          352 |3b77a150  |ACTIVE      |
|          352 |ece6c8d0  |ACTIVE      |
|          352 |78911c81  |ACTIVE      |
|          507 |3b77a150  |COMPLETE    |
|          349 |ece6c8d0  |ACTIVE      |
|          349 |78911c81  |ACTIVE      |
|          349 |3b77a150  |ACTIVE      |
+--------------+----------+------------+

我想添加一个名为 completionEventTimestamp的类型的新列。对于每一行,此列将具有以下值:

  1. 如果存在与当前行相同的voyageStatus值的记录,则CC_3等于"COMPLETE",则该值将是该记录的eventTimestamp
  2. 否则,该值将为-1(因此可以稍后滤除该行)。

这是上述示例的转换所产生的:

+--------------+----------+------------+------------------------+
|eventTimestamp|trackingId|voyageStatus|completionEventTimestamp|
+--------------+----------+------------+------------------------+
|          504 |78911c81  |COMPLETE    |                     504|
|          504 |3b77a150  |ACTIVE      |                     507|
|          390 |ece6c8d0  |ACTIVE      |                      -1|
|          390 |78911c81  |ACTIVE      |                     504|
|          349 |3b77a150  |ACTIVE      |                     507|
|          349 |ece6c8d0  |ACTIVE      |                      -1|
|          349 |78911c81  |ACTIVE      |                     504|
|          350 |3b77a150  |ACTIVE      |                     507|
|          350 |ece6c8d0  |ACTIVE      |                      -1|
|          350 |78911c81  |ACTIVE      |                     504|
|          351 |3b77a150  |ACTIVE      |                     507|
|          351 |ece6c8d0  |ACTIVE      |                      -1|
|          351 |78911c81  |ACTIVE      |                     504|
|          352 |3b77a150  |ACTIVE      |                     507|
|          352 |ece6c8d0  |ACTIVE      |                      -1|
|          352 |78911c81  |ACTIVE      |                     504|
|          507 |3b77a150  |COMPLETE    |                     507|
|          349 |ece6c8d0  |ACTIVE      |                      -1|
|          349 |78911c81  |ACTIVE      |                     504|
|          349 |3b77a150  |ACTIVE      |                     507|
+--------------+----------+------------+------------------------+

如果这完全有帮助,如果给定trackingId的记录具有"COMPLETE"voyageStatus,则将是trackingId的最后一个记录(如果您要通过eventTimestamp订购),并且只会有一个这样的记录记录。

val completedVoyagesDF = training3.filter(training3("voyageStatus") === "COMPLETED").select("trackingID", "statusTimestamp")
val completedVoyagesArray = completedVoyagesDF.collect().map({
  row: Row => row.getString(0) -> row.getLong(1)
})
val trackingIDToActualArrivalTime = completedVoyagesArray.toMap
val arrivalTime: (String => Long) = (trackingId: String) => {
  trackingIDToActualArrivalTime.getOrElse(trackingId, -1)
}
val arrivalTimeFunc = udf(arrivalTime)
val withActualArrivalTimeDF = training3.withColumn(LABEL_COL_NAME, arrivalTimeFunc(col("trackingId")))
val training4 = withActualArrivalTimeDF.filter(withActualArrivalTimeDF(LABEL_COL_NAME) =!= -1)

您可以在window分区上使用collect_list来保存每个trackingId的状态列表和UDF,以将值分配给completionEventTimestamp,如下所示:

val df = Seq(
  (504L, 10, "ACTIVE"),
  (506L, 10, "ACTIVE"),
  (510L, 10, "COMPLETE"),
  (390L, 11, "ACTIVE"),
  (395L, 11, "ACTIVE"),
  (398L, 11, "ACTIVE"),
  (352L, 12, "ACTIVE"),
  (360L, 12, "COMPLETE")
).toDF("eventTimestamp", "trackingId", "voyageStatus")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
// Save "completeTimestamp" for every row with COMPLETE status
val df2 = df.select(
  $"eventTimestamp".as("completeTimestamp"), $"trackingId"
).where(df("voyageStatus") === "COMPLETE")
// Create a "statusList" per "trackingId" for each row using collect_list over window partitions
val window = Window.partitionBy("trackingId")
val df3 = df.withColumn("statusList", collect_list("voyageStatus").over(window))
// A UDF to check whether statusList contains "COMPLETE"
val checkComplete = udf(
  (l: Seq[String]) => l.contains("COMPLETE")
)
// Join df3 with df2 and apply the UDF to assemble "completionEventTimestamp"
val df4 = df3.join(df2, Seq("trackingId"), "left_outer").
  withColumn(
    "completionEventTimestamp",
    when(checkComplete($"statusList"), $"completeTimestamp").otherwise(-1L)
  ).select(
    "eventTimestamp", "trackingId", "voyageStatus", "completionEventTimestamp"
  )
df4.show
+--------------+----------+------------+------------------------+
|eventTimestamp|trackingId|voyageStatus|completionEventTimestamp|
+--------------+----------+------------+------------------------+
|           352|        12|      ACTIVE|                     360|
|           360|        12|    COMPLETE|                     360|
|           504|        10|      ACTIVE|                     510|
|           506|        10|      ACTIVE|                     510|
|           510|        10|    COMPLETE|                     510|
|           390|        11|      ACTIVE|                      -1|
|           395|        11|      ACTIVE|                      -1|
|           398|        11|      ACTIVE|                      -1|
+--------------+----------+------------+------------------------+

相关内容

  • 没有找到相关文章

最新更新