我想为下面的数据帧的ID组创建一个新列,该列的值为上一个日期(日期减去当前日期)
+---+----------+-----+
| id| date|value|
+---+----------+-----+
| a|2015-04-11| 300|
| a|2015-04-12| 400|
| a|2015-04-12| 200|
| a|2015-04-12| 100|
| a|2015-04-11| 700|
| b|2015-04-02| 100|
| b|2015-04-12| 100|
| c|2015-04-12| 400|
+---+----------+-----+
我试过铅窗功能。
val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value")
var w1=Window.partitionBy("id").orderBy("date".desc)
var leadc1=lead(df1("value"),1).over(w1)
val df2=df1.withColumn("nvalue",leadc1)
+---+----------+-----+------+
| id| date|value|nvalue|
+---+----------+-----+------+
| a|2015-04-12| 400| 200|
| a|2015-04-12| 200| 100|
| a|2015-04-12| 100| 300|
| a|2015-04-11| 300| 700|
| a|2015-04-11| 700| null|
| b|2015-04-12| 100| 100|
| b|2015-04-02| 100| null|
| c|2015-04-12| 400| null|
+---+----------+-----+------+
但正如我们所看到的,当我在id"a"中有相同的日期时,我得到了错误的结果。结果应该像一样
+---+----------+-----+------+
| id| date|value|nvalue|
+---+----------+-----+------+
| a|2015-04-12| 400| 300|
| a|2015-04-12| 200| 300|
| a|2015-04-12| 100| 300|
| a|2015-04-11| 300| null|
| a|2015-04-11| 700| null|
| b|2015-04-12| 100| 100|
| b|2015-04-02| 100| null|
| c|2015-04-12| 400| null|
+---+----------+-----+------+
我已经有了一个使用join的解决方案,尽管我正在寻找一个使用window函数的解决方案。
感谢
问题是您有多个日期相同的行。lead
将取结果集中下一个行的value
,而不是下一个日期。因此,当您按日期降序对行进行排序时,下一行可能是相同的日期。
如何确定用于特定日期的正确值?例如,为什么你从(id=a,date=2015-04-11)中取300,而不是700?
要使用窗口函数做到这一点,您可能需要进行多次传递——这将采用最后一个nvalue
,并将其应用于同一id/日期分组中的所有行——但我不确定行最初是如何排序的。
val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value")
var w1 = Window.partitionBy("id").orderBy("date".desc)
var leadc1 = lead(df1("value"),1).over(w1)
val df2 = df1.withColumn("nvalue",leadc1)
val w2 = Window.partitionBy("id", "date").orderBy("??? some way to distinguish row ordering")
val df3 = df1.withColumn("nvalue2", last_value("nvalue").over(w2))