如何根据前一天计算当前日值,在pyspark?



我有以下场景。在第11天,我有余额,它每天都在减去交易。我需要在一天开始和结束的时候计算一下余额。我正在尝试使用延迟功能。

遵循sql中的逻辑,记住它创建一个遍历整个月的循环,总是取前一个月和当前日期

if month('day')=1 then
do;
begin_day = saldo + trans - vl_dis
+ vl_car + vl_ret;
end_day = saldo ;
end;
IF month('day')>1 then
do;
begin_day = end_day;
end_day = begin_day - trans
+ vl_dis - vl_car - vl_ret;
end;

输出预期:

+--------+--------+------+------+------+------+---------+--------+----------+
| key    |   saldo| trans|vl_dis|vl_car|vl_ret|begin_day| end_day|       day|
+--------+--------+------+------+------+------+---------+--------+----------+
|123     |   100.0|   1.0|   2.0|   0.0|   0.0|     99.0|   100.0|2022-02-01|
|123     |     0.0|   1.0|   0.0|   0.0|   0.0|    100.0|    99.0|2022-02-02|
|123     |     0.0|   1.0|   0.0|   0.0|   0.0|     99.0|    98.0|2022-02-03|
|123     |     0.0|   1.0|   0.0|   0.0|   0.0|     98.0|    97.0|2022-02-04|
|123     |     0.0|   1.0|   2.0|   0.0|   0.0|     97.0|    98.0|2022-02-05|
|123     |     0.0|   1.0|   0.0|   0.0|   0.0|     98.0|    97.0|2022-02-06|
|123     |     0.0|   1.0|   0.0|   0.0|   0.0|     97.0|    96.0|2022-02-07|
|123     |     0.0|   1.0|   2.0|   0.0|   0.0|     96.0|    97.0|2022-02-08|
|123     |     0.0|   1.0|   0.0|   0.0|   0.0|     97.0|    96.0|2022-02-09|
+--------+--------+------+------+------+------+---------+--------+----------+

乍一看,这像是一个递归问题,因为我们必须不断回顾前一行的值来计算下一个值,而pyspark不支持递归cte或表

但是当仔细观察时,这个问题可以通过运行sum和延迟来解决,所以下面首先准备上面的数据,不需要start_day和end_day值,这些值将根据所需的导入计算

from io import StringIO
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window
s=StringIO("""
key|saldo|trans|vl_dis|vl_car|vl_ret|day
123|100.0|1.0|2.0|0.0|0.0|2022-02-01
123|0.0|1.0|0.0|0.0|0.0|2022-02-02
123|0.0|1.0|0.0|0.0|0.0|2022-02-03
123|0.0|1.0|0.0|0.0|0.0|2022-02-04
123|0.0|1.0|2.0|0.0|0.0|2022-02-05
123|0.0|1.0|0.0|0.0|0.0|2022-02-06
123|0.0|1.0|0.0|0.0|0.0|2022-02-07
123|0.0|1.0|2.0|0.0|0.0|2022-02-08
123|0.0|1.0|0.0|0.0|0.0|2022-02-09""")
dfp=pd.read_csv(s,sep='|')
dfs=spark.createDataFrame(dfp)

现在我们将在下面创建具有所需的start_day和end_day列的数据框

r_w=Window.partitionBy("month").orderBy("dom")
dfs.withColumn("month",F.month(F.col("day"))).
withColumn("dom",F.date_format(F.col("day"), "d")).
withColumn("trans_sum",F.sum("trans").over(r_w)-F.first("trans").over(r_w)).
withColumn("vl_dis_sum",F.sum("vl_dis").over(r_w)-F.first("vl_dis").over(r_w)).
withColumn("vl_car_sum",F.sum("vl_car").over(r_w)-F.first("vl_car").over(r_w)).
withColumn("vl_ret_sum",F.sum("vl_ret").over(r_w)-F.first("vl_ret").over(r_w)).
withColumn("start_day",F.when(F.col("dom")==1,F.col("saldo")+ F.col("trans")-F.col("vl_dis")+F.col("vl_car")+F.col("vl_ret")).otherwise(F.first("saldo").over(r_w)- F.lag(F.col("trans_sum"),1).over(r_w)+F.lag(F.col("vl_dis_sum"),1).over(r_w)-F.lag(F.col("vl_car_sum"),1).over(r_w)-F.lag(F.col("vl_ret_sum"),1).over(r_w))).
withColumn("end_day",F.when(F.col("dom")==1,F.col("saldo")).otherwise(F.first("saldo").over(r_w)- F.col("trans_sum")+F.col("vl_dis_sum")-F.col("vl_car_sum")-F.col("vl_ret_sum"))).
drop("month","dom","trans_sum","vl_car_sum","vl_dis_sum","vl_ret_sum").
show()
#output
+---+-----+-----+------+------+------+----------+---------+-------+
|key|saldo|trans|vl_dis|vl_car|vl_ret|       day|start_day|end_day|
+---+-----+-----+------+------+------+----------+---------+-------+
|123|100.0|  1.0|   2.0|   0.0|   0.0|2022-02-01|     99.0|  100.0|
|123|  0.0|  1.0|   0.0|   0.0|   0.0|2022-02-02|    100.0|   99.0|
|123|  0.0|  1.0|   0.0|   0.0|   0.0|2022-02-03|     99.0|   98.0|
|123|  0.0|  1.0|   0.0|   0.0|   0.0|2022-02-04|     98.0|   97.0|
|123|  0.0|  1.0|   2.0|   0.0|   0.0|2022-02-05|     97.0|   98.0|
|123|  0.0|  1.0|   0.0|   0.0|   0.0|2022-02-06|     98.0|   97.0|
|123|  0.0|  1.0|   0.0|   0.0|   0.0|2022-02-07|     97.0|   96.0|
|123|  0.0|  1.0|   2.0|   0.0|   0.0|2022-02-08|     96.0|   97.0|
|123|  0.0|  1.0|   0.0|   0.0|   0.0|2022-02-09|     97.0|   96.0|
+---+-----+-----+------+------+------+----------+---------+-------+

最新更新