我有一个Dataframe
EmpId EmpName Salary SalaryDate
1 Amit 1000.0 2016-01-01
1 Amit 2000.0 2016-02-01
1 Amit 1000.0 2016-03-01
1 Amit 2000.0 2016-04-01
1 Amit 3000.0 2016-05-01
1 Amit 1000.0 2016-06-01
我想添加一个名为prevSal的新列,其中将包含Amit的前一行工资值的数据
预期输出:
EmpId EmpName Salary SalaryDate prevSal
1 Amit 1000.0 2016-01-01 null
1 Amit 2000.0 2016-02-01 1000.0
1 Amit 1000.0 2016-03-01 2000.0
1 Amit 2000.0 2016-04-01 1000.0
1 Amit 3000.0 2016-05-01 2000.0
1 Amit 1000.0 2016-06-01 3000.0
另外,我想要一个名为NextSal的新列,其中将包含Amit的下一行工资值的数据。预期的输出
EmpId EmpName Salary SalaryDate prevSal nextSal
1 Amit 1000.0 2016-01-01 null 2000.0
1 Amit 2000.0 2016-02-01 1000.0 1000.0
1 Amit 1000.0 2016-03-01 2000.0 2000.0
1 Amit 2000.0 2016-04-01 1000.0 3000.0
1 Amit 3000.0 2016-05-01 2000.0 1000.0
1 Amit 1000.0 2016-06-01 3000.0 null
正如gordon在他们的评论中指出的那样,这是一个简单的案例,使用lag
和lead
根据当前每次扫描的行访问前一行和下一行的值。
这些函数工作的关键基本上是在DataFrame内强制执行行顺序,以完全确定哪一行在另一行之前或之后。为此,我们使用Spark的一个名为orderBy
的窗口函数。您的DataFrame中的orderBy
应该用于将用作对行排序的参考的列上。由于您的行已经按SalaryDate
列排序,因此这样的内容就足够了:
val w = Window.orderBy("SalaryDate")
关于prevSal
列,可以在这里看到lag
的使用情况,根据这些信息,我们可以使用上面的窗口函数来访问前一行的Salary
值,如下所示(第一个参数是要搜索的列的名称,第二个参数是偏移量,即每次我们想要返回多少行):
lag("Salary", 1).over(w)
lead
的工作方式相同,但Salary
的前向值。lead
的用法如下:
lead("Salary", 1).over(w)
所以所有这些看起来有点像这样(假设df
是你的DataFrame的名字):
val w = Window.orderBy("SalaryDate")
df.withColumn("prevSal", lag("Salary", 1).over(w))
.withColumn("nextSal", lead("Salary", 1).over(w))
.show()
所以最终的结果是这样的:
+-----+-------+------+----------+-------+-------+
|EmpId|EmpName|Salary|SalaryDate|prevSal|nextSal|
+-----+-------+------+----------+-------+-------+
| 1| Amit| 1000|2016-01-01| null| 2000|
| 1| Amit| 2000|2016-02-01| 1000| 1000|
| 1| Amit| 1000|2016-03-01| 2000| 2000|
| 1| Amit| 2000|2016-04-01| 1000| 3000|
| 1| Amit| 3000|2016-05-01| 2000| 1000|
| 1| Amit| 1000|2016-06-01| 3000| null|
+-----+-------+------+----------+-------+-------+
如果你想让prevSal
和nextSal
针对每个雇主(例如Amit有一组不同的prevSal
/nextSal
,而不是John),你只需改变窗口函数,首先按EmpName
划分表,然后按Salary
排序:
val w = Window.partitionBy("EmpName").orderBy("SalaryDate")