使用Spark窗口函数可以在数据帧中创建列



我想为下面的数据帧的ID组创建一个新列,该列的值为上一个日期(日期减去当前日期)

+---+----------+-----+
| id|      date|value|
+---+----------+-----+
|  a|2015-04-11|  300|
|  a|2015-04-12|  400|
|  a|2015-04-12|  200|
|  a|2015-04-12|  100|
|  a|2015-04-11|  700|
|  b|2015-04-02|  100|
|  b|2015-04-12|  100|
|  c|2015-04-12|  400|
+---+----------+-----+

我试过铅窗功能。

val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value")
 var w1=Window.partitionBy("id").orderBy("date".desc)
 var leadc1=lead(df1("value"),1).over(w1)
 val df2=df1.withColumn("nvalue",leadc1)
+---+----------+-----+------+                                                   
| id|      date|value|nvalue|
+---+----------+-----+------+
|  a|2015-04-12|  400|   200|
|  a|2015-04-12|  200|   100|
|  a|2015-04-12|  100|   300|
|  a|2015-04-11|  300|   700|
|  a|2015-04-11|  700|  null|
|  b|2015-04-12|  100|   100|
|  b|2015-04-02|  100|  null|
|  c|2015-04-12|  400|  null|
+---+----------+-----+------+

但正如我们所看到的,当我在id"a"中有相同的日期时,我得到了错误的结果。结果应该像一样

+---+----------+-----+------+                                                   
| id|      date|value|nvalue|
+---+----------+-----+------+
|  a|2015-04-12|  400|   300|
|  a|2015-04-12|  200|   300|
|  a|2015-04-12|  100|   300|
|  a|2015-04-11|  300|  null|
|  a|2015-04-11|  700|  null|
|  b|2015-04-12|  100|   100|
|  b|2015-04-02|  100|  null|
|  c|2015-04-12|  400|  null|
+---+----------+-----+------+

我已经有了一个使用join的解决方案,尽管我正在寻找一个使用window函数的解决方案。

感谢

问题是您有多个日期相同的行。lead将取结果集中下一个value,而不是下一个日期。因此,当您按日期降序对行进行排序时,下一行可能是相同的日期。

如何确定用于特定日期的正确值?例如,为什么你从(id=a,date=2015-04-11)中取300,而不是700?

要使用窗口函数做到这一点,您可能需要进行多次传递——这将采用最后一个nvalue,并将其应用于同一id/日期分组中的所有行——但我不确定行最初是如何排序的。

 val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value")
var w1 = Window.partitionBy("id").orderBy("date".desc)
var leadc1 = lead(df1("value"),1).over(w1)
val df2 = df1.withColumn("nvalue",leadc1)
val w2 = Window.partitionBy("id", "date").orderBy("??? some way to distinguish row ordering")
val df3 = df1.withColumn("nvalue2", last_value("nvalue").over(w2))

相关内容

  • 没有找到相关文章

最新更新