给定此数据帧 df
+-----------+--------------------+-------------+-------+
|CustNumb | PurchaseDate| price| activeFlag|
+-----------+--------------------+-------------+-------+
| 3|2013-07-17 00:00:...| 17.9| 0|
| 3|2013-08-27 00:00:...| 61.13| 0|
| 3|2013-08-28 00:00:...| 25.07| 1|
| 3|2013-08-29 00:00:...| 24.23| 0|
| 3|2013-09-06 00:00:...| 3.94| 0|
| 20|2013-02-28 00:00:...| 354.64| 0|
| 20|2013-04-07 00:00:...| 15.0| 0|
| 20|2013-05-10 00:00:...| 545.0| 0|
| 28|2013-02-17 00:00:...| 190.0| 0|
| 28|2013-04-08 00:00:...| 20.0| 0|
| 28|2013-04-16 00:00:...| 89.0| 0|
| 28|2013-05-18 00:00:...| 260.0| 0|
| 28|2013-06-06 00:00:...| 586.57| 1|
| 28|2013-06-09 00:00:...| 250.0| 0|
当它找到一个非活动标志"1"时,我想得到返回按购买日期订购前后 2 行的平均价格的结果。这是我正在寻找的结果:
+-----------+--------------------+-------------+-------+---------------+
|CustNumb | PurchaseDate| price| activeFlag| OutputVal |
+-----------+--------------------+-------------+-------+------------+
| 3|2013-07-17 00:00:...| 17.9| 0| 17.9
| 3|2013-08-27 00:00:...| 61.13| 0| 61.13
| 3|2013-08-28 00:00:...| 25.07| 1| 26.8 (avg of 2 prices before and 2 after)
| 3|2013-08-29 00:00:...| 24.23| 0| 24.23
| 3|2013-09-06 00:00:...| 3.94| 0| 3.94
| 20|2013-02-28 00:00:...| 354.64| 0| 354.64
| 20|2013-04-07 00:00:...| 15.0| 0| 15.0
| 20|2013-05-10 00:00:...| 545.0| 0| 545.0
| 28|2013-02-17 00:00:...| 190.0| 0| 190.0
| 28|2013-04-08 00:00:...| 20.0| 0| 20.0
| 28|2013-04-16 00:00:...| 89.0| 0| 89.0
| 28|2013-05-18 00:00:...| 260.0| 0| 260.0
| 28|2013-06-06 00:00:...| 586.57| 1| 199.6 (avg of 2 prices before and 1 after)
| 28|2013-06-09 00:00:...| 250.0| 0| 250
在上面的 custNum 3 和 28 示例中,我有 activeFlag 1,因此我需要计算前后 2 行的平均值,如果它以相同的 custNumb 存在。
我正在考虑在数据帧上使用窗口函数,但由于我对 Spark 编程很陌生,因此无法获得任何好主意来解决这个问题
val w = Window.partitionBy("CustNumb").orderBy("PurchaseDate")
我如何实现这一点,它可以通过窗口功能或任何更好的方法来实现吗?
如果你已经有窗口,像这样的简单条件应该可以正常工作:
val cond = ($"activeFlag" === 1) && (lag($"activeFlag", 1).over(w) === 0)
// Windows covering rows before and after
val before = w.rowsBetween(-2, -1)
val after = w.rowsBetween(1, 2)
// Expression with sum of rows and number of rows
val sumPrice = sum($"price").over(before) + sum($"price").over(after)
val countPrice = sum($"ones_").over(before) + sum($"ones_").over(after)
val expr = when(cond, sumPrice / countPrice).otherwise($"price")
df.withColumn("ones_", lit(1)).withColumn("outputVal", expr)
谢谢 Zero323。你真棒!!这是我基于您帮助的代码片段,我修改了以获取我正在寻找的数据结果:
val windw = Window.partitionBy("CustNumb").orderBy("PurchaseDate")
val cond = ($"activeFlag" === 1) //&& (lag($"activeFlag", 1).over(win) === 0)
val avgprice = (lag($"price", 1).over(windw) + lag($"price", 2).over(windw) + lead($"price", 1).over(windw) + lead($"price", 2).over(windw)) / 4.0
val expr = when(cond, avgprice).otherwise($"price")
val finalresult = df.withColumn("newPrice", expr)
我唯一要弄清楚的是,如果 activeflag = 1 存在于上面的行中,那么我想在 activeflag=1 的行上方多走一行。如果我找到解决方法来获取此内容,我将尝试更新。