如何计算给定数据的 5 天平均值、10 天平均值和 15 天平均值?



方案:

我有以下的以下数据帧

```     -- -----------------------------------
        companyId | calc_date   | mean   |
        ----------------------------------
        1111      | 01-08-2002  |  15    |
        ----------------------------------
        1111      | 02-08-2002  |  16.5   |
        ----------------------------------
        1111      | 03-08-2002  |  17     |
        ----------------------------------
        1111      | 04-08-2002  |  15     |
        ----------------------------------
        1111      | 05-08-2002  |  23     |
        ----------------------------------
        1111      | 06-08-2002  |  22.6   |
        ----------------------------------
        1111      | 07-08-2002  |  25     | 
        ----------------------------------
        1111      | 08-08-2002  |  15     |
        ----------------------------------
        1111      | 09-08-2002  |  15     |
        ----------------------------------
        1111      | 10-08-2002  |  16.5   |
        ----------------------------------
        1111      | 11-08-2002  |  22.6   |
        ----------------------------------
        1111      | 12-08-2002  |  15     |
        ----------------------------------
        1111      | 13-08-2002  |  16.5   |
        ----------------------------------
        1111      | 14-08-2002  |  25     |
        ----------------------------------
        1111      | 15-08-2002  |  16.5   |
        ----------------------------------
```

必需:

需要计算给定数据的5天平均值,10天平均值,每家公司的每一个记录的平均值15天。

5 day-mean   -->  Previous 5 days available mean sum
10 day-mean  --> Previous 10 days available mean sum
15 day-mean  --> Previous 15 days available mean sum

由此产生的dataFrame应为

以下的列。
        ----------------------------------------------------------------------------
        companyId | calc_date   | mean   |  5 day-mean | 10-day mean | 15-day mean |
        ----------------------------------------------------------------------------

问题:
如何实现这一目标? 在Spark中做到这一点的最佳方法是什么?

这是使用公司使用窗口分区的一种方法,以通过rangeBetween在指定的时间戳范围内计算当前行和以前的行之间的n-day mean,如下所示(使用虚拟数据集):

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val df = (1 to 3).flatMap(i => Seq.tabulate(15)(j => (i, s"${j+1}-2-2019", j+1))).
  toDF("company_id", "calc_date", "mean")
df.show
// +----------+---------+----+
// |company_id|calc_date|mean|
// +----------+---------+----+
// |         1| 1-2-2019|   1|
// |         1| 2-2-2019|   2|
// |         1| 3-2-2019|   3|
// |         1| 4-2-2019|   4|
// |         1| 5-2-2019|   5|
// |         ...             |
// |         1|14-2-2019|  14|
// |         1|15-2-2019|  15|
// |         2| 1-2-2019|   1|
// |         2| 2-2-2019|   2|
// |         2| 3-2-2019|   3|
// |         ...             |
// +----------+---------+----+
def winSpec = Window.partitionBy("company_id").orderBy("ts")
def dayRange(days: Int) = winSpec.rangeBetween(-(days * 24 * 60 * 60), 0)
df.
  withColumn("ts", unix_timestamp(to_date($"calc_date", "d-M-yyyy"))).
  withColumn("mean-5", mean($"mean").over(dayRange(5))).
  withColumn("mean-10", mean($"mean").over(dayRange(10))).
  withColumn("mean-15", mean($"mean").over(dayRange(15))).
  show
// +----------+---------+----+----------+------+-------+-------+
// |company_id|calc_date|mean|        ts|mean-5|mean-10|mean-15|
// +----------+---------+----+----------+------+-------+-------+
// |         1| 1-2-2019|   1|1549008000|   1.0|    1.0|    1.0|
// |         1| 2-2-2019|   2|1549094400|   1.5|    1.5|    1.5|
// |         1| 3-2-2019|   3|1549180800|   2.0|    2.0|    2.0|
// |         1| 4-2-2019|   4|1549267200|   2.5|    2.5|    2.5|
// |         1| 5-2-2019|   5|1549353600|   3.0|    3.0|    3.0|
// |         1| 6-2-2019|   6|1549440000|   3.5|    3.5|    3.5|
// |         1| 7-2-2019|   7|1549526400|   4.5|    4.0|    4.0|
// |         1| 8-2-2019|   8|1549612800|   5.5|    4.5|    4.5|
// |         1| 9-2-2019|   9|1549699200|   6.5|    5.0|    5.0|
// |         1|10-2-2019|  10|1549785600|   7.5|    5.5|    5.5|
// |         1|11-2-2019|  11|1549872000|   8.5|    6.0|    6.0|
// |         1|12-2-2019|  12|1549958400|   9.5|    7.0|    6.5|
// |         1|13-2-2019|  13|1550044800|  10.5|    8.0|    7.0|
// |         1|14-2-2019|  14|1550131200|  11.5|    9.0|    7.5|
// |         1|15-2-2019|  15|1550217600|  12.5|   10.0|    8.0|
// |         3| 1-2-2019|   1|1549008000|   1.0|    1.0|    1.0|
// |         3| 2-2-2019|   2|1549094400|   1.5|    1.5|    1.5|
// |         3| 3-2-2019|   3|1549180800|   2.0|    2.0|    2.0|
// |         3| 4-2-2019|   4|1549267200|   2.5|    2.5|    2.5|
// |         3| 5-2-2019|   5|1549353600|   3.0|    3.0|    3.0|
// +----------+---------+----+----------+------+-------+-------+
// only showing top 20 rows

请注意,如果保证日期是连续的每天时间序列,则可以直接在calc_date上使用rowsBetween(而不是rangeBetween)。

相关内容

  • 没有找到相关文章

最新更新