我有两个数据帧,一个具有id
的唯一值,另一个可以具有多个不同id
的值。
这是数据帧df1
:
id | dt| speed | stats
358899055773504 2018-07-31 18:38:34 0 [9,-1,-1,13,0,1,0]
358899055773505 2018-07-31 18:48:23 4 [8,-1,0,22,1,1,1]
df2
:
id | dt| speed | stats
358899055773504 2018-07-31 18:38:34 0 [9,-1,-1,13,0,1,0]
358899055773505 2018-07-31 18:54:23 4 [9,0,0,22,1,1,1]
358899055773504 2018-07-31 18:58:34 0 [9,0,-1,22,0,1,0]
358899055773504 2018-07-31 18:28:34 0 [9,0,-1,22,0,1,0]
358899055773505 2018-07-31 18:38:23 4 [8,-1,0,22,1,1,1]
我的目标是将第二个数据帧与第一个数据帧进行比较,并更新第一个数据帧中的值,仅当特定id
df2
的dt
值大于df1
中的值,并且如果它满足大于条件,则也比较其他字段。
您需要将两个数据帧join
在一起,才能对其列进行比较。
您可以做的是首先联接数据帧,然后执行所有筛选以获取包含应更新的所有行的新数据帧:
val diffDf = df1.as("a").join(df2.as("b"), Seq("id"))
.filter($"b.dt" > $"a.dt")
.filter(...) // Any other filter required
.select($"id", $"b.dt", $"b.speed", $"b.stats")
注意:在某些情况下,需要执行groupBy(id)
或使用窗口函数,因为diffDf
数据帧中每个id
应该只有一个最后一行。这可以按如下方式完成(这里的例子将选择速度最大的行,但这取决于实际要求(:
val w = Window.partitionBy($"id").orderBy($"speed".desc)
val diffDf2 = diffDf.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
有关不同方法的更深入信息,请参阅此处:如何最大化值并保留所有列(对于每个组的最大记录数(?。
要将旧行替换为df1
数据帧中的相同id
,请将数据帧与外部联接合并并coalesce
:
val df = df1.as("a").join(diffDf.as("b"), Seq("id"), "outer")
.select(
$"id",
coalesce($"b.dt", $"a.dt").as("dt"),
coalesce($"b.speed", $"a.speed").as("speed"),
coalesce($"b.stats", $"a.stats").as("stats")
)
coalesce
通过首先尝试从diffDf
(b
(数据帧中获取值来工作。如果该值为 null,它将从df1
(a
( 中获取该值。
仅将时间过滤器与提供的示例输入数据帧一起使用时的结果:
+---------------+-------------------+-----+-----------------+
| id| dt|speed| stats|
+---------------+-------------------+-----+-----------------+
|358899055773504|2018-07-31 18:58:34| 0|[9,0,-1,22,0,1,0]|
|358899055773505|2018-07-31 18:54:23| 4| [9,0,0,22,1,1,1]|
+---------------+-------------------+-----+-----------------+