PySpark:具有多个条件的列的聚合函数



我有两个PySpark数据框A和B

A
GROUP |    date    | 
1    | 2021-02-01 |
1    | 2021-04-01 |
1    | 2021-07-23 | 
1    | 2021-07-30 | 
2    | 2021-02-01 |
2    | 2021-04-01 |
2    | 2021-07-23 | 
2    | 2021-07-30 | 

B
GROUP |    date    | val
1    | 2021-03-31 | 15
2    | 2021-03-31 | 25
2    | 2021-06-30 | 40

我想将它们连接起来,使新列last_reported_val为(MAX(B.date) and B.date <= A.date and A.group = B.group),否则为NULL。列valB对应的值。下面是一个例子:

GROUP |    date    | last_reported_val |   val   |
1    | 2021-02-01 |        NULL       |   NULL  |
1    | 2021-04-01 |      2021-03-31   |   15    |
1    | 2021-07-23 |      2021-03-31   |   15    |
1    | 2021-07-30 |      2021-03-31   |   15    |
2    | 2021-02-01 |        NULL       |   NULL  |
2    | 2021-04-01 |      2021-03-31   |   25    |
2    | 2021-07-23 |      2021-06-30   |   40    |
2    | 2021-07-30 |      2021-06-30   |   40    |

对于SQL,我会这样做

SELECT A.group, A.date, (select MAX(B.date) from B where B.date <= A.date and A.group = B.group) as last_reported_val, B.val
FROM A
LEFT JOIN B
on A.group = B.group

如何在PySpark中做到这一点?我试过join,然后是map,没有成功

A.join(B, A['GROUP'] == B['GROUP'], 'left')
...
# This raises _thread.lock error 
A.rdd.map(lambda r: (..., A.filter(
(A['a.date'] == r['a.date']) & (A['group'] == r['group'])
).agg(max_('b.date')).collect())

您可以首先加入您的条件,然后对于B中满足A中超过1个日期的条件的每个日期,通过创建一个窗口获取B中的最大日期:

from pyspark.sql import functions as F, Window as W
o = (A.alias("A").join(B.alias("B"),on=[F.col("A.GROUP")==F.col("B.GROUP"),
F.col("B.date") <= F.col("A.date")]
, how='left')
.select("A.*",F.col("B.date").alias("last_reported_val"),"B.val"))
w = W.partitionBy("GROUP","date").orderBy(F.desc("last_reported_val"))
o.withColumn("Rnum",F.row_number().over(w)).filter("Rnum==1").drop("Rnum").show()

+-----+----------+-----------------+----+
|GROUP|      date|last_reported_val| val|
+-----+----------+-----------------+----+
|    1|2021-02-01|             null|null|
|    1|2021-04-01|       2021-03-31|  15|
|    1|2021-07-23|       2021-03-31|  15|
|    1|2021-07-30|       2021-03-31|  15|
|    2|2021-02-01|             null|null|
|    2|2021-04-01|       2021-03-31|  25|
|    2|2021-07-23|       2021-06-30|  40|
|    2|2021-07-30|       2021-06-30|  40|
+-----+----------+-----------------+----+