方案:
我有以下的以下数据帧
``` -- -----------------------------------
companyId | calc_date | mean |
----------------------------------
1111 | 01-08-2002 | 15 |
----------------------------------
1111 | 02-08-2002 | 16.5 |
----------------------------------
1111 | 03-08-2002 | 17 |
----------------------------------
1111 | 04-08-2002 | 15 |
----------------------------------
1111 | 05-08-2002 | 23 |
----------------------------------
1111 | 06-08-2002 | 22.6 |
----------------------------------
1111 | 07-08-2002 | 25 |
----------------------------------
1111 | 08-08-2002 | 15 |
----------------------------------
1111 | 09-08-2002 | 15 |
----------------------------------
1111 | 10-08-2002 | 16.5 |
----------------------------------
1111 | 11-08-2002 | 22.6 |
----------------------------------
1111 | 12-08-2002 | 15 |
----------------------------------
1111 | 13-08-2002 | 16.5 |
----------------------------------
1111 | 14-08-2002 | 25 |
----------------------------------
1111 | 15-08-2002 | 16.5 |
----------------------------------
```
必需:
需要计算给定数据的5天平均值,10天平均值,每家公司的每一个记录的平均值15天。
5 day-mean --> Previous 5 days available mean sum
10 day-mean --> Previous 10 days available mean sum
15 day-mean --> Previous 15 days available mean sum
由此产生的dataFrame应为
以下的列。 ----------------------------------------------------------------------------
companyId | calc_date | mean | 5 day-mean | 10-day mean | 15-day mean |
----------------------------------------------------------------------------
问题:
如何实现这一目标? 在Spark中做到这一点的最佳方法是什么?
这是使用公司使用窗口分区的一种方法,以通过rangeBetween
在指定的时间戳范围内计算当前行和以前的行之间的n-day mean
,如下所示(使用虚拟数据集):
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val df = (1 to 3).flatMap(i => Seq.tabulate(15)(j => (i, s"${j+1}-2-2019", j+1))).
toDF("company_id", "calc_date", "mean")
df.show
// +----------+---------+----+
// |company_id|calc_date|mean|
// +----------+---------+----+
// | 1| 1-2-2019| 1|
// | 1| 2-2-2019| 2|
// | 1| 3-2-2019| 3|
// | 1| 4-2-2019| 4|
// | 1| 5-2-2019| 5|
// | ... |
// | 1|14-2-2019| 14|
// | 1|15-2-2019| 15|
// | 2| 1-2-2019| 1|
// | 2| 2-2-2019| 2|
// | 2| 3-2-2019| 3|
// | ... |
// +----------+---------+----+
def winSpec = Window.partitionBy("company_id").orderBy("ts")
def dayRange(days: Int) = winSpec.rangeBetween(-(days * 24 * 60 * 60), 0)
df.
withColumn("ts", unix_timestamp(to_date($"calc_date", "d-M-yyyy"))).
withColumn("mean-5", mean($"mean").over(dayRange(5))).
withColumn("mean-10", mean($"mean").over(dayRange(10))).
withColumn("mean-15", mean($"mean").over(dayRange(15))).
show
// +----------+---------+----+----------+------+-------+-------+
// |company_id|calc_date|mean| ts|mean-5|mean-10|mean-15|
// +----------+---------+----+----------+------+-------+-------+
// | 1| 1-2-2019| 1|1549008000| 1.0| 1.0| 1.0|
// | 1| 2-2-2019| 2|1549094400| 1.5| 1.5| 1.5|
// | 1| 3-2-2019| 3|1549180800| 2.0| 2.0| 2.0|
// | 1| 4-2-2019| 4|1549267200| 2.5| 2.5| 2.5|
// | 1| 5-2-2019| 5|1549353600| 3.0| 3.0| 3.0|
// | 1| 6-2-2019| 6|1549440000| 3.5| 3.5| 3.5|
// | 1| 7-2-2019| 7|1549526400| 4.5| 4.0| 4.0|
// | 1| 8-2-2019| 8|1549612800| 5.5| 4.5| 4.5|
// | 1| 9-2-2019| 9|1549699200| 6.5| 5.0| 5.0|
// | 1|10-2-2019| 10|1549785600| 7.5| 5.5| 5.5|
// | 1|11-2-2019| 11|1549872000| 8.5| 6.0| 6.0|
// | 1|12-2-2019| 12|1549958400| 9.5| 7.0| 6.5|
// | 1|13-2-2019| 13|1550044800| 10.5| 8.0| 7.0|
// | 1|14-2-2019| 14|1550131200| 11.5| 9.0| 7.5|
// | 1|15-2-2019| 15|1550217600| 12.5| 10.0| 8.0|
// | 3| 1-2-2019| 1|1549008000| 1.0| 1.0| 1.0|
// | 3| 2-2-2019| 2|1549094400| 1.5| 1.5| 1.5|
// | 3| 3-2-2019| 3|1549180800| 2.0| 2.0| 2.0|
// | 3| 4-2-2019| 4|1549267200| 2.5| 2.5| 2.5|
// | 3| 5-2-2019| 5|1549353600| 3.0| 3.0| 3.0|
// +----------+---------+----+----------+------+-------+-------+
// only showing top 20 rows
请注意,如果保证日期是连续的每天时间序列,则可以直接在calc_date
上使用rowsBetween
(而不是rangeBetween
)。