SparkSQL 在分组后从数据帧中提取前后的行



给定此数据帧 df

 +-----------+--------------------+-------------+-------+
|CustNumb   |        PurchaseDate|     price| activeFlag|
+-----------+--------------------+-------------+-------+
|          3|2013-07-17 00:00:...|         17.9|    0|
|          3|2013-08-27 00:00:...|        61.13|    0|
|          3|2013-08-28 00:00:...|        25.07|    1|
|          3|2013-08-29 00:00:...|        24.23|    0|
|          3|2013-09-06 00:00:...|         3.94|    0|
|         20|2013-02-28 00:00:...|       354.64|    0|
|         20|2013-04-07 00:00:...|         15.0|    0|
|         20|2013-05-10 00:00:...|        545.0|    0|
|         28|2013-02-17 00:00:...|        190.0|    0|
|         28|2013-04-08 00:00:...|         20.0|    0|
|         28|2013-04-16 00:00:...|         89.0|    0|
|         28|2013-05-18 00:00:...|        260.0|    0|
|         28|2013-06-06 00:00:...|       586.57|    1|
|         28|2013-06-09 00:00:...|        250.0|    0|

当它找到一个非活动标志"1"时,我想得到返回按购买日期订购前后 2 行的平均价格的结果。这是我正在寻找的结果:

+-----------+--------------------+-------------+-------+---------------+
|CustNumb   |        PurchaseDate|     price| activeFlag| OutputVal |
+-----------+--------------------+-------------+-------+------------+
|          3|2013-07-17 00:00:...|         17.9|    0|   17.9
|          3|2013-08-27 00:00:...|        61.13|    0|   61.13
|          3|2013-08-28 00:00:...|        25.07|    1|   26.8 (avg of 2 prices before and 2 after)
|          3|2013-08-29 00:00:...|        24.23|    0|   24.23
|          3|2013-09-06 00:00:...|         3.94|    0|   3.94
|         20|2013-02-28 00:00:...|       354.64|    0|   354.64
|         20|2013-04-07 00:00:...|         15.0|    0|   15.0
|         20|2013-05-10 00:00:...|        545.0|    0|   545.0
|         28|2013-02-17 00:00:...|        190.0|    0|   190.0
|         28|2013-04-08 00:00:...|         20.0|    0|   20.0
|         28|2013-04-16 00:00:...|         89.0|    0|   89.0
|         28|2013-05-18 00:00:...|        260.0|    0|   260.0
|         28|2013-06-06 00:00:...|       586.57|    1|   199.6 (avg of 2 prices before and 1 after)
|         28|2013-06-09 00:00:...|        250.0|    0|   250

在上面的 custNum 3 和 28 示例中,我有 activeFlag 1,因此我需要计算前后 2 行的平均值,如果它以相同的 custNumb 存在。

我正在考虑在数据帧上使用窗口函数,但由于我对 Spark 编程很陌生,因此无法获得任何好主意来解决这个问题

val w = Window.partitionBy("CustNumb").orderBy("PurchaseDate")

我如何实现这一点,它可以通过窗口功能或任何更好的方法来实现吗?

如果你已经有窗口,像这样的简单条件应该可以正常工作:

val cond = ($"activeFlag" === 1) && (lag($"activeFlag", 1).over(w) === 0)
// Windows covering rows before and after
val before = w.rowsBetween(-2, -1)
val after = w.rowsBetween(1, 2)
// Expression with sum of rows and number of rows 
val sumPrice = sum($"price").over(before) + sum($"price").over(after)
val countPrice = sum($"ones_").over(before) + sum($"ones_").over(after)
val expr = when(cond, sumPrice / countPrice).otherwise($"price")
df.withColumn("ones_", lit(1)).withColumn("outputVal", expr)

谢谢 Zero323。你真棒!!这是我基于您帮助的代码片段,我修改了以获取我正在寻找的数据结果:

 val windw = Window.partitionBy("CustNumb").orderBy("PurchaseDate")
 val cond = ($"activeFlag" === 1) //&& (lag($"activeFlag", 1).over(win) === 0)
 val avgprice = (lag($"price", 1).over(windw)  + lag($"price", 2).over(windw) + lead($"price", 1).over(windw)  + lead($"price", 2).over(windw)) / 4.0
 val expr = when(cond, avgprice).otherwise($"price")
 val finalresult = df.withColumn("newPrice", expr)

我唯一要弄清楚的是,如果 activeflag = 1 存在于上面的行中,那么我想在 activeflag=1 的行上方多走一行。如果我找到解决方法来获取此内容,我将尝试更新。

相关内容

  • 没有找到相关文章

最新更新