回顾 N 个月并将它们添加为列 Spark SQL 聚合



我必须每三个月回顾一次,并使用with列添加上个月的金额。

val data = Seq(("1","201706","5"),("1","201707","10"),("2","201604","12"),("2","201601","15")).toDF("id","yyyyMM","amount")
+---+------+------+
| id|yyyyMM|amount|
+---+------+------+
|  1|201706|     5|
|  1|201707|    10|
|  2|201604|    12|
|  2|201601|    15|
+---+------+------+

所需的输出应如下所示。对于每个月,我们必须回顾三个月,我可以使用火花窗口滞后函数来做到这一点。我们应该如何包含添加附加记录的功能

+---+---------+------+-----------+-------+-----------+-------+
| id|yearmonth|amount|yearmonth-1|amount2|yearmonth-2|amount3|
+---+---------+------+-----------+-------+-----------+-------+
|  1|   201709|     0|     201708|      0|     201707|     10|
|  1|   201708|     0|     201707|     10|     201706|      5|
|  1|   201707|    10|     201706|      5|     201705|      0|
|  1|   201706|     5|     201705|      0|     201706|      0|
|  2|   201606|     0|     201605|      0|     201604|     12|
|  2|   201605|     0|     201604|     12|     201603|      0|
|  2|   201604|    12|     201603|      0|     201602|      0|
|  2|   201603|     0|     201602|      0|     201601|     15|
|  2|   201602|     0|     201601|     15|     201512|      0|
|  2|   201601|    15|     201512|      0|     201511|      0|
+---+---------+------+-----------+-------+-----------+-------+

我的意思是表中的第一条记录就像向前看。比如再增加几个月。采取以下记录。

+---+---------+------+-----------+-------+-----------+-------+
| id|yearmonth|amount|yearmonth-1|amount2|yearmonth-2|amount3|
+---+---------+------+-----------+-------+-----------+-------+
|  1|   201709|     0|     201708|      0|     201707|     10|
|  1|   201708|     0|     201707|     10|     201706|      5|
我不知道

是否有更好的方法,但您需要在某处创建 te 记录。滞后不会这样做。因此,首先您需要根据当前记录生成新记录。然后你可以使用滞后函数。

也许是这样的:

data
  // convert the string to an actual date
  .withColumn("yearmonth", to_date('yyyyMM, "yyyyMM"))
  // for each record create 2 additional in the future (with 0 amount)
  .select(
  explode(array(
    // org record
    struct('id, date_format('yearmonth, "yyyyMM").as("yearmonth"), 'amount),
    // 1 month in future
    struct('id, date_format(add_months('yearmonth, 1), "yyyyMM").as("yearmonth"), lit(0).as("amount")),
    // 2 months in future
    struct('id, date_format(add_months('yearmonth, 2), "yyyyMM").as("yearmonth"), lit(0).as("amount"))
  )).as("record"))
  // keep 1 record per month
  .groupBy($"record.yearmonth")
  .agg(
    min($"record.id").as("id"),
    sum($"record.amount").as("amount")
  )
  // final structure (with lag fields)
  .select(
    'id,
    'yearmonth,
    'amount,
     lag('yearmonth, 1).over(orderByWindow).as("yearmonth-1"),
     lag('amount, 1, 0).over(orderByWindow).as("amount2"),
     lag('yearmonth, 2).over(orderByWindow).as("yearmonth-2"),
     lag('amount, 2, 0).over(orderByWindow).as("amount3")
  )
  .orderBy('yearmonth.desc)

这并不完美,但这是一个开始

+---+---------+------+-----------+-------+-----------+-------+
|id |yearmonth|amount|yearmonth-1|amount2|yearmonth-2|amount3|
+---+---------+------+-----------+-------+-----------+-------+
|1  |201709   |0.0   |201708     |0.0    |201707     |10.0   |
|1  |201708   |0.0   |201707     |10.0   |201706     |5.0    |
|1  |201707   |10.0  |201706     |5.0    |201606     |0.0    |
|1  |201706   |5.0   |201606     |0.0    |201605     |0.0    |
|2  |201606   |0.0   |201605     |0.0    |201604     |12.0   |
|2  |201605   |0.0   |201604     |12.0   |201603     |0.0    |
|2  |201604   |12.0  |201603     |0.0    |201602     |0.0    |
|2  |201603   |0.0   |201602     |0.0    |201601     |15.0   |
|2  |201602   |0.0   |201601     |15.0   |null       |0.0    |
|2  |201601   |15.0  |null       |0.0    |null       |0.0    |
+---+---------+------+-----------+-------+-----------+-------+

相关内容

  • 没有找到相关文章

最新更新