Spark Scala SQL基于另一个列创建一个新列



我有一个Dataframe

EmpId  EmpName   Salary   SalaryDate 
1     Amit  1000.0   2016-01-01  
1     Amit  2000.0   2016-02-01  
1      Amit     1000.0   2016-03-01   
1      Amit     2000.0   2016-04-01  
1      Amit     3000.0   2016-05-01  
1      Amit     1000.0   2016-06-01

我想添加一个名为prevSal的新列,其中将包含Amit的前一行工资值的数据

预期输出:

EmpId  EmpName   Salary   SalaryDate prevSal 
1      Amit     1000.0   2016-01-01  null 
1      Amit     2000.0   2016-02-01  1000.0 
1      Amit     1000.0   2016-03-01  2000.0  
1      Amit     2000.0   2016-04-01  1000.0 
1      Amit     3000.0   2016-05-01  2000.0 
1      Amit     1000.0   2016-06-01  3000.0

另外,我想要一个名为NextSal的新列,其中将包含Amit的下一行工资值的数据。预期的输出

EmpId  EmpName   Salary   SalaryDate prevSal  nextSal 
1      Amit     1000.0   2016-01-01   null   2000.0  
1      Amit     2000.0   2016-02-01  1000.0  1000.0 
1      Amit     1000.0   2016-03-01  2000.0  2000.0 
1      Amit     2000.0   2016-04-01  1000.0  3000.0 
1      Amit     3000.0   2016-05-01  2000.0  1000.0 
1      Amit     1000.0   2016-06-01  3000.0   null 

正如gordon在他们的评论中指出的那样,这是一个简单的案例,使用laglead根据当前每次扫描的行访问前一行和下一行的值。

这些函数工作的关键基本上是在DataFrame内强制执行行顺序,以完全确定哪一行在另一行之前或之后。为此,我们使用Spark的一个名为orderBy的窗口函数。您的DataFrame中的orderBy应该用于将用作对行排序的参考的列上。由于您的行已经按SalaryDate列排序,因此这样的内容就足够了:

val w = Window.orderBy("SalaryDate")

关于prevSal列,可以在这里看到lag的使用情况,根据这些信息,我们可以使用上面的窗口函数来访问前一行的Salary值,如下所示(第一个参数是要搜索的列的名称,第二个参数是偏移量,即每次我们想要返回多少行):

lag("Salary", 1).over(w)

lead的工作方式相同,但Salary的前向值。lead的用法如下:

lead("Salary", 1).over(w)

所以所有这些看起来有点像这样(假设df是你的DataFrame的名字):

val w = Window.orderBy("SalaryDate")
df.withColumn("prevSal", lag("Salary", 1).over(w))
.withColumn("nextSal", lead("Salary", 1).over(w))
.show()

所以最终的结果是这样的:

+-----+-------+------+----------+-------+-------+
|EmpId|EmpName|Salary|SalaryDate|prevSal|nextSal|
+-----+-------+------+----------+-------+-------+
|    1|   Amit|  1000|2016-01-01|   null|   2000|
|    1|   Amit|  2000|2016-02-01|   1000|   1000|
|    1|   Amit|  1000|2016-03-01|   2000|   2000|
|    1|   Amit|  2000|2016-04-01|   1000|   3000|
|    1|   Amit|  3000|2016-05-01|   2000|   1000|
|    1|   Amit|  1000|2016-06-01|   3000|   null|
+-----+-------+------+----------+-------+-------+

如果你想让prevSalnextSal针对每个雇主(例如Amit有一组不同的prevSal/nextSal,而不是John),你只需改变窗口函数,首先按EmpName划分表,然后按Salary排序:

val w = Window.partitionBy("EmpName").orderBy("SalaryDate")

最新更新