在pyspark中获取上一个分区中的最后一个值



我有这个数据帧:

+------+-------------------+------+----------+------+
|catulz|             hatulz|ccontr|    dmovto|amount|
+------+-------------------+------+----------+------+
|     I|1900-01-01 16:00:00|   123|2022-09-01|300.00|
|     U|1900-01-01 17:00:00|   123|2022-09-02|500.00|
|     I|1900-01-01 16:00:00|   123|2022-09-02|150.00|
|     U|1900-01-01 18:00:00|   123|2022-09-03|500.00|
|     I|1900-01-01 16:00:00|   123|2022-09-03|150.00|
|     I|1900-01-01 16:00:00|   123|2022-09-04|150.00|
|     U|1900-01-01 19:00:00|   123|2022-09-04|150.00|
|     I|1900-01-01 16:00:00|   123|2022-09-05|150.00|
|     I|1900-01-01 16:00:00|   123|2022-09-06|150.00|
|     I|1900-01-01 16:00:00|   123|2022-09-07|150.00|
+------+-------------------+------+----------+------+

我需要用这个规则得到金额:

  • 如果我在dmovto只有一天的时间,就取行的金额
  • 当我在dmovto的同一天:
    • 通过ccontr+dmovto按PREVIOUS分区查找;如果我和"U";,获取金额。否则,用";我">

就像这样:

+------+-------------------+------+----------+------+----------+
|catulz|             hatulz|ccontr|    dmovto|amount|new_amount|
+------+-------------------+------+----------+------+----------+
|     I|1900-01-01 16:00:00|   123|2022-09-01|300.00|  300.00  |
|     U|1900-01-01 17:00:00|   123|2022-09-02|500.00|  300.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-02|150.00|  300.00  |
|     U|1900-01-01 18:00:00|   123|2022-09-03|500.00|  500.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-03|150.00|  500.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-04|150.00|  500.00  |
|     U|1900-01-01 19:00:00|   123|2022-09-04|150.00|  500.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-05|150.00|  150.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-06|150.00|  150.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-07|150.00|  150.00  |
+------+-------------------+------+----------+------+----------+

PS:";U〃;来自";"更新";,这是首要任务。如果我通过";ccontr"+"dmovto";并按";hatulz";,它可以工作

我尝试创建一个Window.partitionBy(["ccontr","dmovto"](.orderBy("hatulz"(,并使用滞后和最后但没有成功的

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, IntegerType, StringType, DateType, DecimalType, 
StringType, TimestampType
from datetime import datetime
from decimal import Decimal
from pyspark.sql.window import Window
from pyspark.sql.functions import avg,col
spark = SparkSession.builder.master("local[4]").appName("tests").getOrCreate()
vdata = [
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-01','%Y-%m-%d'),Decimal(300)),
('U',datetime.strptime('17:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-02','%Y-%m-%d'),Decimal(500)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-02','%Y-%m-%d'),Decimal(150)),
('U',datetime.strptime('18:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-03','%Y-%m-%d'),Decimal(500)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-03','%Y-%m-%d'),Decimal(150)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-04','%Y-%m-%d'),Decimal(150)),
('U',datetime.strptime('19:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-04','%Y-%m-%d'),Decimal(150)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-05','%Y-%m-%d'),Decimal(150)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-06','%Y-%m-%d'),Decimal(150)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-07','%Y-%m-%d'),Decimal(150)),
]
schema = StructType([
StructField("catulz",StringType(),False),
StructField("hatulz",TimestampType(),False),
StructField("ccontr",IntegerType(),False),
StructField("dmovto",DateType(),False),
StructField("amount",DecimalType(10,2),False)])
df = spark.createDataFrame(vdata,schema)
-----------
Solution 
--------------
from pyspark.sql.types import StructType,StructField, IntegerType, StringType, DateType, DecimalType, 
StringType, TimestampType
from datetime import datetime
from decimal import Decimal
vdata = [
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-01','%Y-%m-%d'),Decimal(300)),
('U',datetime.strptime('17:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-02','%Y-%m-%d'),Decimal(500)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-02','%Y-%m-%d'),Decimal(150)),
('U',datetime.strptime('18:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-03','%Y-%m-%d'),Decimal(500)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-03','%Y-%m-%d'),Decimal(150)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-04','%Y-%m-%d'),Decimal(150)),
('U',datetime.strptime('19:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-04','%Y-%m-%d'),Decimal(150)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-05','%Y-%m-%d'),Decimal(150)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-06','%Y-%m-%d'),Decimal(150)),
('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-07','%Y-%m-%d'),Decimal(150)),
]
schema = StructType([
StructField("catulz",StringType(),False),
StructField("hatulz",TimestampType(),False),
StructField("ccontr",IntegerType(),False),
StructField("dmovto",DateType(),False),
StructField("amount",DecimalType(10,2),False)])
df = spark.createDataFrame(vdata,schema)

实际实施


Window_spec= Window.partitionBy(["ccontr","dmovto"])
window_previous_partition= Window_spec.orderBy(F.asc("hatulz")).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
previous_partition_value_filter= (F.filter("previous_amount_values_lst",lambda x:x['0']=="U"))
previous_partition_value_cond_ = F.when(F.size("previous_amount_values_flt")==1,F.col("previous_amount_values_flt")).otherwise(F.col("previous_amount_values_lst"))
window_spec_previous_amount_values = Window.partitionBy(["ccontr"]).orderBy(["dmovto"]).rowsBetween(Window.unboundedPreceding,Window.currentRow-1)
dmtvo_distinct_amount_values_cond_ = (F.arrays_zip(F.collect_list("dmovto").over(window_spec_previous_amount_values),F.collect_list(F.col("final_previous_partition_values")[0]['1']).over(window_spec_previous_amount_values)))
final_amount_cond_ = (F.when(F.col("count_rows_per_partition")==2,F.col("max_previous_amount_part").getItem("1")).otherwise(F.col("amount")))
df_fnl=df
.withColumn("previous_amount_values_lst",
F.arrays_zip(F.collect_list("catulz").over(window_previous_partition),F.collect_list("amount").over(window_previous_partition)))
df_fnl_flt = df_fnl.withColumn("previous_amount_values_flt",previous_partition_value_filter)
.withColumn("final_previous_partition_values",previous_partition_value_cond_)
.withColumn("count_rows_per_partition",F.count("*").over(Window_spec))
.withColumn("dmtvo_distinct_amount_values",dmtvo_distinct_amount_values_cond_)
.withColumn("max_previous_amount_part",F.array_max(F.filter("dmtvo_distinct_amount_values",lambda x:x.getItem('0')< F.col("dmovto"))))
.withColumn("final_amount",final_amount_cond_)
.drop("previous_amount_values_lst","previous_amount_values_flt",
"dmtvo_distinct_amount_values","count_rows_per_partition","final_previous_partition_values","max_previous_amount_part")

df_fnl_flt.show(10,0)
-----------
final output 
-----------
+------+-------------------+------+----------+------+------------+
|catulz|hatulz             |ccontr|dmovto    |amount|final_amount|
+------+-------------------+------+----------+------+------------+
|I     |1900-01-01 16:00:00|123   |2022-09-01|300.00|300.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-02|150.00|300.00      |
|U     |1900-01-01 17:00:00|123   |2022-09-02|500.00|300.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-03|150.00|500.00      |
|U     |1900-01-01 18:00:00|123   |2022-09-03|500.00|500.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-04|150.00|500.00      |
|U     |1900-01-01 19:00:00|123   |2022-09-04|150.00|500.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-05|150.00|150.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-06|150.00|150.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-07|150.00|150.00      |
+------+-------------------+------+----------+------+------------+
Kindly upvote if you like my solution .

首先,在catulz列上创建一个枢轴df,拾取每个枢轴列上的金额:

pvt_df = df
.groupby('ccontr','dmovto', 'amount')
.pivot("catulz")
.agg(first("amount"))
.select(col('ccontr'),
col('dmovto').alias('lag_dmovto'),
col('I'),
col('U'))
.groupby('ccontr','lag_dmovto')
.agg(sum("I").alias('I'), sum("U").alias('U'))
.orderBy('lag_dmovto')

pvt_df.show()
# +------+----------+------+------+
# |ccontr|lag_dmovto|     I|     U|
# +------+----------+------+------+
# |   123|2022-09-01|300.00|  null|
# |   123|2022-09-02|150.00|500.00|
# |   123|2022-09-03|150.00|500.00|
# |   123|2022-09-04|150.00|150.00|
# |   123|2022-09-05|150.00|  null|
# |   123|2022-09-06|150.00|  null|
# |   123|2022-09-07|150.00|  null|
# +------+----------+------+------+

创建一个包含日期和相应前一天的df(lag列(:

dates_df = df.groupBy('dmovto').agg(max('dmovto'))
.withColumn('lag_dmovto', F.lag(F.col('dmovto')).over(Window.partitionBy().orderBy('dmovto')))
.select('dmovto', 'lag_dmovto')

创建count_dmovto列。如果CCD_ 6是CCD_。否则:

  • 加入dates_df查找前一天
  • 与旋转的df连接,以获得I和U的数量
  • 既然我们已经有了所有的列,那么介绍使用when的逻辑并选择适当的列
df
.withColumn('count_dmovto', count(F.col('dmovto')).over(Window.partitionBy('dmovto')))
.join(dates_df, 'dmovto', 'left')
.join(pvt_df, ['ccontr','lag_dmovto'], 'left')
.withColumn('new_amount', when(col('count_dmovto') == 1, col('amount'))
.when(col('U').isNotNull(), col('U'))
.otherwise(col('I')))
.select('catulz','ccontr','dmovto','amount','new_amount')
.show()
# +------+------+----------+------+----------+
# |catulz|ccontr|    dmovto|amount|new_amount|
# +------+------+----------+------+----------+
# |     I|   123|2022-09-01|300.00|    300.00|
# |     I|   123|2022-09-02|150.00|    300.00|
# |     U|   123|2022-09-02|500.00|    300.00|
# |     I|   123|2022-09-03|150.00|    500.00|
# |     U|   123|2022-09-03|500.00|    500.00|
# |     I|   123|2022-09-04|150.00|    500.00|
# |     U|   123|2022-09-04|150.00|    500.00|
# |     I|   123|2022-09-05|150.00|    150.00|
# |     I|   123|2022-09-06|150.00|    150.00|
# |     I|   123|2022-09-07|150.00|    150.00|
# +------+------+----------+------+----------+

相关内容

最新更新