我必须每三个月回顾一次,并使用with列添加上个月的金额。
val data = Seq(("1","201706","5"),("1","201707","10"),("2","201604","12"),("2","201601","15")).toDF("id","yyyyMM","amount")
+---+------+------+
| id|yyyyMM|amount|
+---+------+------+
| 1|201706| 5|
| 1|201707| 10|
| 2|201604| 12|
| 2|201601| 15|
+---+------+------+
所需的输出应如下所示。对于每个月,我们必须回顾三个月,我可以使用火花窗口滞后函数来做到这一点。我们应该如何包含添加附加记录的功能
+---+---------+------+-----------+-------+-----------+-------+
| id|yearmonth|amount|yearmonth-1|amount2|yearmonth-2|amount3|
+---+---------+------+-----------+-------+-----------+-------+
| 1| 201709| 0| 201708| 0| 201707| 10|
| 1| 201708| 0| 201707| 10| 201706| 5|
| 1| 201707| 10| 201706| 5| 201705| 0|
| 1| 201706| 5| 201705| 0| 201706| 0|
| 2| 201606| 0| 201605| 0| 201604| 12|
| 2| 201605| 0| 201604| 12| 201603| 0|
| 2| 201604| 12| 201603| 0| 201602| 0|
| 2| 201603| 0| 201602| 0| 201601| 15|
| 2| 201602| 0| 201601| 15| 201512| 0|
| 2| 201601| 15| 201512| 0| 201511| 0|
+---+---------+------+-----------+-------+-----------+-------+
我的意思是表中的第一条记录就像向前看。比如再增加几个月。采取以下记录。
+---+---------+------+-----------+-------+-----------+-------+
| id|yearmonth|amount|yearmonth-1|amount2|yearmonth-2|amount3|
+---+---------+------+-----------+-------+-----------+-------+
| 1| 201709| 0| 201708| 0| 201707| 10|
| 1| 201708| 0| 201707| 10| 201706| 5|
我不知道
是否有更好的方法,但您需要在某处创建 te 记录。滞后不会这样做。因此,首先您需要根据当前记录生成新记录。然后你可以使用滞后函数。
也许是这样的:
data
// convert the string to an actual date
.withColumn("yearmonth", to_date('yyyyMM, "yyyyMM"))
// for each record create 2 additional in the future (with 0 amount)
.select(
explode(array(
// org record
struct('id, date_format('yearmonth, "yyyyMM").as("yearmonth"), 'amount),
// 1 month in future
struct('id, date_format(add_months('yearmonth, 1), "yyyyMM").as("yearmonth"), lit(0).as("amount")),
// 2 months in future
struct('id, date_format(add_months('yearmonth, 2), "yyyyMM").as("yearmonth"), lit(0).as("amount"))
)).as("record"))
// keep 1 record per month
.groupBy($"record.yearmonth")
.agg(
min($"record.id").as("id"),
sum($"record.amount").as("amount")
)
// final structure (with lag fields)
.select(
'id,
'yearmonth,
'amount,
lag('yearmonth, 1).over(orderByWindow).as("yearmonth-1"),
lag('amount, 1, 0).over(orderByWindow).as("amount2"),
lag('yearmonth, 2).over(orderByWindow).as("yearmonth-2"),
lag('amount, 2, 0).over(orderByWindow).as("amount3")
)
.orderBy('yearmonth.desc)
这并不完美,但这是一个开始
+---+---------+------+-----------+-------+-----------+-------+
|id |yearmonth|amount|yearmonth-1|amount2|yearmonth-2|amount3|
+---+---------+------+-----------+-------+-----------+-------+
|1 |201709 |0.0 |201708 |0.0 |201707 |10.0 |
|1 |201708 |0.0 |201707 |10.0 |201706 |5.0 |
|1 |201707 |10.0 |201706 |5.0 |201606 |0.0 |
|1 |201706 |5.0 |201606 |0.0 |201605 |0.0 |
|2 |201606 |0.0 |201605 |0.0 |201604 |12.0 |
|2 |201605 |0.0 |201604 |12.0 |201603 |0.0 |
|2 |201604 |12.0 |201603 |0.0 |201602 |0.0 |
|2 |201603 |0.0 |201602 |0.0 |201601 |15.0 |
|2 |201602 |0.0 |201601 |15.0 |null |0.0 |
|2 |201601 |15.0 |null |0.0 |null |0.0 |
+---+---------+------+-----------+-------+-----------+-------+