我有以下场景。在第11天,我有余额,它每天都在减去交易。我需要在一天开始和结束的时候计算一下余额。我正在尝试使用延迟功能。
遵循sql中的逻辑,记住它创建一个遍历整个月的循环,总是取前一个月和当前日期
if month('day')=1 then
do;
begin_day = saldo + trans - vl_dis
+ vl_car + vl_ret;
end_day = saldo ;
end;
IF month('day')>1 then
do;
begin_day = end_day;
end_day = begin_day - trans
+ vl_dis - vl_car - vl_ret;
end;
输出预期:
+--------+--------+------+------+------+------+---------+--------+----------+
| key | saldo| trans|vl_dis|vl_car|vl_ret|begin_day| end_day| day|
+--------+--------+------+------+------+------+---------+--------+----------+
|123 | 100.0| 1.0| 2.0| 0.0| 0.0| 99.0| 100.0|2022-02-01|
|123 | 0.0| 1.0| 0.0| 0.0| 0.0| 100.0| 99.0|2022-02-02|
|123 | 0.0| 1.0| 0.0| 0.0| 0.0| 99.0| 98.0|2022-02-03|
|123 | 0.0| 1.0| 0.0| 0.0| 0.0| 98.0| 97.0|2022-02-04|
|123 | 0.0| 1.0| 2.0| 0.0| 0.0| 97.0| 98.0|2022-02-05|
|123 | 0.0| 1.0| 0.0| 0.0| 0.0| 98.0| 97.0|2022-02-06|
|123 | 0.0| 1.0| 0.0| 0.0| 0.0| 97.0| 96.0|2022-02-07|
|123 | 0.0| 1.0| 2.0| 0.0| 0.0| 96.0| 97.0|2022-02-08|
|123 | 0.0| 1.0| 0.0| 0.0| 0.0| 97.0| 96.0|2022-02-09|
+--------+--------+------+------+------+------+---------+--------+----------+
乍一看,这像是一个递归问题,因为我们必须不断回顾前一行的值来计算下一个值,而pyspark不支持递归cte或表
但是当仔细观察时,这个问题可以通过运行sum和延迟来解决,所以下面首先准备上面的数据,不需要start_day和end_day值,这些值将根据所需的导入计算
from io import StringIO
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window
s=StringIO("""
key|saldo|trans|vl_dis|vl_car|vl_ret|day
123|100.0|1.0|2.0|0.0|0.0|2022-02-01
123|0.0|1.0|0.0|0.0|0.0|2022-02-02
123|0.0|1.0|0.0|0.0|0.0|2022-02-03
123|0.0|1.0|0.0|0.0|0.0|2022-02-04
123|0.0|1.0|2.0|0.0|0.0|2022-02-05
123|0.0|1.0|0.0|0.0|0.0|2022-02-06
123|0.0|1.0|0.0|0.0|0.0|2022-02-07
123|0.0|1.0|2.0|0.0|0.0|2022-02-08
123|0.0|1.0|0.0|0.0|0.0|2022-02-09""")
dfp=pd.read_csv(s,sep='|')
dfs=spark.createDataFrame(dfp)
现在我们将在下面创建具有所需的start_day和end_day列的数据框
r_w=Window.partitionBy("month").orderBy("dom")
dfs.withColumn("month",F.month(F.col("day"))).
withColumn("dom",F.date_format(F.col("day"), "d")).
withColumn("trans_sum",F.sum("trans").over(r_w)-F.first("trans").over(r_w)).
withColumn("vl_dis_sum",F.sum("vl_dis").over(r_w)-F.first("vl_dis").over(r_w)).
withColumn("vl_car_sum",F.sum("vl_car").over(r_w)-F.first("vl_car").over(r_w)).
withColumn("vl_ret_sum",F.sum("vl_ret").over(r_w)-F.first("vl_ret").over(r_w)).
withColumn("start_day",F.when(F.col("dom")==1,F.col("saldo")+ F.col("trans")-F.col("vl_dis")+F.col("vl_car")+F.col("vl_ret")).otherwise(F.first("saldo").over(r_w)- F.lag(F.col("trans_sum"),1).over(r_w)+F.lag(F.col("vl_dis_sum"),1).over(r_w)-F.lag(F.col("vl_car_sum"),1).over(r_w)-F.lag(F.col("vl_ret_sum"),1).over(r_w))).
withColumn("end_day",F.when(F.col("dom")==1,F.col("saldo")).otherwise(F.first("saldo").over(r_w)- F.col("trans_sum")+F.col("vl_dis_sum")-F.col("vl_car_sum")-F.col("vl_ret_sum"))).
drop("month","dom","trans_sum","vl_car_sum","vl_dis_sum","vl_ret_sum").
show()
#output
+---+-----+-----+------+------+------+----------+---------+-------+
|key|saldo|trans|vl_dis|vl_car|vl_ret| day|start_day|end_day|
+---+-----+-----+------+------+------+----------+---------+-------+
|123|100.0| 1.0| 2.0| 0.0| 0.0|2022-02-01| 99.0| 100.0|
|123| 0.0| 1.0| 0.0| 0.0| 0.0|2022-02-02| 100.0| 99.0|
|123| 0.0| 1.0| 0.0| 0.0| 0.0|2022-02-03| 99.0| 98.0|
|123| 0.0| 1.0| 0.0| 0.0| 0.0|2022-02-04| 98.0| 97.0|
|123| 0.0| 1.0| 2.0| 0.0| 0.0|2022-02-05| 97.0| 98.0|
|123| 0.0| 1.0| 0.0| 0.0| 0.0|2022-02-06| 98.0| 97.0|
|123| 0.0| 1.0| 0.0| 0.0| 0.0|2022-02-07| 97.0| 96.0|
|123| 0.0| 1.0| 2.0| 0.0| 0.0|2022-02-08| 96.0| 97.0|
|123| 0.0| 1.0| 0.0| 0.0| 0.0|2022-02-09| 97.0| 96.0|
+---+-----+-----+------+------+------+----------+---------+-------+