Spark 根据上一行的旧值对列进行计算



我有一个给定的数据框,如下所示

+----------------+---------------+----------+------------------+-------------+
|Transaction_date|    Added  date|coupon_id |cart_value        | coupon_value|
+----------------+---------------+----------+------------------+-------------+
|2018-01-16      |2018-02-01     |2390324796|12.5              |1.8          |
|2018-01-16      |2018-01-04     |1100111212|1.0               |2.0          |
|2018-01-19      |2018-01-04     |1100111212|2.5               |2.0          |
+----------------+---------------+----------+------------------+-------------+

我需要将优惠券价值应用于购物车价值并更新优惠券余额和自动兑换价值,仅当"Transaction_date"大于优惠券价值的"添加日期"时,我才想这样做

逻辑

更新余额 = (coupon_value - cart_value ),如果cart_value更多,则仅兑换可用的票面价值。

已兑换 = 给定交易中兑换的金额

我想要这样的东西

+----------------+---------------+----------+------------------+-------------+-------------+-------------+
|Transaction_date|    Added  date|coupon_id |cart_value        | coupon_value|UpdatedBalance|Redeemed      |
+----------------+---------------+----------+------------------+-------------+-------------+-------------+
|2018-01-16      |2018-02-01     |2390324796|12.5              |1.8          |0            |0            |
|2018-01-16      |2018-01-04     |1100111212|1.0               |2.0          |1            |1            |
|2018-01-19      |2018-01-04     |1100111212|2.5               |2.0          |0            |1            |
+----------------+---------------+----------+------------------+-------------+-------------+-------------+

我正在尝试在火花斯卡拉中做到这一点

假设

分区在整个表上并按added_date降序排序,下面就可以了

scala> val df =Seq(("2018-01-16","2018-02-01",2390324796L,12.5,1.8),("2018-01-16","2018-01-04",1100111212L,1.0,2.0),("2018-01-19","2018-01-04",1100111212L,2.5,2.0)).toDF("Transaction_date","Added_date","coupon_id","cart_value","coupon_value")
df: org.apache.spark.sql.DataFrame = [Transaction_date: string, Added_date: string ... 3 more fields]
scala> df.show(false)
+----------------+----------+----------+----------+------------+
|Transaction_date|Added_date|coupon_id |cart_value|coupon_value|
+----------------+----------+----------+----------+------------+
|2018-01-16      |2018-02-01|2390324796|12.5      |1.8         |
|2018-01-16      |2018-01-04|1100111212|1.0       |2.0         |
|2018-01-19      |2018-01-04|1100111212|2.5       |2.0         |
+----------------+----------+----------+----------+------------+

scala> val df2 = df.withColumn("UpdatedBalance",when('coupon_value>'cart_value,'coupon_value-'cart_value).otherwise(0))
df2: org.apache.spark.sql.DataFrame = [Transaction_date: string, Added_date: string ... 4 more fields]
scala> df2.show(false)
+----------------+----------+----------+----------+------------+--------------+
|Transaction_date|Added_date|coupon_id |cart_value|coupon_value|UpdatedBalance|
+----------------+----------+----------+----------+------------+--------------+
|2018-01-16      |2018-02-01|2390324796|12.5      |1.8         |0.0           |
|2018-01-16      |2018-01-04|1100111212|1.0       |2.0         |1.0           |
|2018-01-19      |2018-01-04|1100111212|2.5       |2.0         |0.0           |
+----------------+----------+----------+----------+------------+--------------+
scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._

scala> df2.withColumn("Redeemed",sum('UpdatedBalance).over(Window.orderBy('Added_date.desc))).show(false)
19/01/03 10:31:50 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----------------+----------+----------+----------+------------+--------------+--------+
|Transaction_date|Added_date|coupon_id |cart_value|coupon_value|UpdatedBalance|Redeemed|
+----------------+----------+----------+----------+------------+--------------+--------+
|2018-01-16      |2018-02-01|2390324796|12.5      |1.8         |0.0           |0.0     |
|2018-01-16      |2018-01-04|1100111212|1.0       |2.0         |1.0           |1.0     |
|2018-01-19      |2018-01-04|1100111212|2.5       |2.0         |0.0           |1.0     |
+----------------+----------+----------+----------+------------+--------------+--------+

scala>

相关内容

  • 没有找到相关文章

最新更新