Spark SQL连接在最近的日期的ID



我下面有两个表,我试图基于ID和基于ingest_date列的最接近的可用weekly_dt日期加入。

在标准的ANSI SQL中,我通常使用关联子查询,并将查询限制为每行一个结果,因此没有聚合错误,但是在标准的SparkSQL中这样做会给我以下错误
AnalysisException: Correlated scalar subqueries must be aggregated: GlobalLimit 1

设置
import pandas as pd 
import numpy as np 
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
np.random.seed(25)
A1 = [('A1', i.date(), np.random.randint(0,50)) for i in pd.date_range('01 Jan 2021', '21 Jan 2021',freq='D')]
A2 = [('A2', i.date(), np.random.randint(0,50)) for i in pd.date_range('01 Jan 2021', '21 Jan 2021',freq='D')]

df_a = spark.createDataFrame(A1  + A2, ['id','ingest_date','amt'])
weekly_scores = [
('A1', pd.Timestamp('01 Jan 2021').date(), '0.5'),
('A1', pd.Timestamp('08 Jan 2021').date(), '0.3'),
('A1', pd.Timestamp('15 Jan 2021').date(), '0.8'),
('A1', pd.Timestamp('22 Jan 2021').date(), '0.6'),
('A2', pd.Timestamp('01 Jan 2021').date(), '0.6'),
('A2', pd.Timestamp('08 Jan 2021').date(), '0.1'),
('A2', pd.Timestamp('15 Jan 2021').date(), '0.9'),
('A2', pd.Timestamp('22 Jan 2021').date(), '0.3'),
]
df_b = spark.createDataFrame(weekly_scores, ['id','weekly_dt','score'])


df_a.show()
+---+-----------+---+
| id|ingest_date|amt|
+---+-----------+---+
| A1| 2021-01-01| 26|
| A1| 2021-01-02|  1|
| A1| 2021-01-03|  0|
| A1| 2021-01-04| 31|
| A1| 2021-01-05| 41|
| A1| 2021-01-06| 46|
| A1| 2021-01-07| 11|
| A1| 2021-01-08|  0|
| A1| 2021-01-09| 14|
| A1| 2021-01-10|  5|
| A1| 2021-01-11|  0|
| A1| 2021-01-12| 35|
| A1| 2021-01-13|  5|
| A1| 2021-01-14| 43|
| A1| 2021-01-15| 18|
| A1| 2021-01-16| 31|
| A1| 2021-01-17| 44|
| A1| 2021-01-18| 25|
| A1| 2021-01-19| 47|
| A1| 2021-01-20| 36|
+---+-----------+---+

df_b.show()
+---+----------+-----+
| id| weekly_dt|score|
+---+----------+-----+
| A1|2021-01-01|  0.5|
| A1|2021-01-08|  0.3|
| A1|2021-01-15|  0.8|
| A1|2021-01-22|  0.6|
| A2|2021-01-01|  0.6|
| A2|2021-01-08|  0.1|
| A2|2021-01-15|  0.9|
| A2|2021-01-22|  0.3|
+---+----------+-----+

预期输出。

id ingest_date  amt  weekly_dt score
0    A1  2021-01-01   26 2021-01-01   0.5
4    A1  2021-01-02    1 2021-01-01   0.5
8    A1  2021-01-03    0 2021-01-01   0.5
12   A1  2021-01-04   31 2021-01-01   0.5
17   A1  2021-01-05   41 2021-01-08   0.3
21   A1  2021-01-06   46 2021-01-08   0.3
25   A1  2021-01-07   11 2021-01-08   0.3
29   A1  2021-01-08    0 2021-01-08   0.3
33   A1  2021-01-09   14 2021-01-08   0.3
37   A1  2021-01-10    5 2021-01-08   0.3
41   A1  2021-01-11    0 2021-01-08   0.3
46   A1  2021-01-12   35 2021-01-15   0.8
50   A1  2021-01-13    5 2021-01-15   0.8
54   A1  2021-01-14   43 2021-01-15   0.8
58   A1  2021-01-15   18 2021-01-15   0.8
62   A1  2021-01-16   31 2021-01-15   0.8
66   A1  2021-01-17   44 2021-01-15   0.8
70   A1  2021-01-18   25 2021-01-15   0.8
75   A1  2021-01-19   47 2021-01-22   0.6
79   A1  2021-01-20   36 2021-01-22   0.6
83   A1  2021-01-21   43 2021-01-22   0.6
84   A2  2021-01-01   32 2021-01-01   0.6
88   A2  2021-01-02   37 2021-01-01   0.6
92   A2  2021-01-03   11 2021-01-01   0.6
96   A2  2021-01-04   21 2021-01-01   0.6
101  A2  2021-01-05   29 2021-01-08   0.1
105  A2  2021-01-06   48 2021-01-08   0.1
109  A2  2021-01-07   12 2021-01-08   0.1
113  A2  2021-01-08   40 2021-01-08   0.1
117  A2  2021-01-09   30 2021-01-08   0.1
121  A2  2021-01-10   28 2021-01-08   0.1
125  A2  2021-01-11   41 2021-01-08   0.1
130  A2  2021-01-12   12 2021-01-15   0.9
134  A2  2021-01-13   10 2021-01-15   0.9
138  A2  2021-01-14   10 2021-01-15   0.9
142  A2  2021-01-15   31 2021-01-15   0.9
146  A2  2021-01-16   13 2021-01-15   0.9
150  A2  2021-01-17   31 2021-01-15   0.9
154  A2  2021-01-18   11 2021-01-15   0.9
159  A2  2021-01-19   15 2021-01-22   0.3
163  A2  2021-01-20   18 2021-01-22   0.3
167  A2  2021-01-21   49 2021-01-22   0.3

Spark Query

SELECT 
a.id,
a.ingestion_date,
a.amt,
b.weekly_dt,
b.score
FROM a
LEFT JOIN b 
ON a.id = b.id 
AND a.ingestion_date  = 
(
SELECT weekly_dt
FROM b
WHERE id = b.id 
ORDER BY DATEDIFF(a.ingest_date, weekly_dt) ASC
LIMIT 1
)

编辑,

我知道我可以创建一个窗口并使用dense_rank()来订购结果,但我想知道这是否是最好的方法?

from pyspark.sql import Window
s = spark.sql("""
SELECT 
a.id,
a.ingest_date,
a.amt,
b.weekly_dt,
b.score
FROM a 
LEFT JOIN b 
ON b.id = a.id 
""").withColumn('delta', 
F.abs(F.datediff(F.col('ingest_date'), F.col('weekly_dt')
)
)
)
s.withColumn('t', 
F.dense_rank().over(

Window.partitionBy('id','ingest_date').orderBy(F.asc('delta')))
).filter('t == 1').drop('t','delta').show()
id ingest_date  amt   weekly_dt score
0   A2  2021-01-01   32  2021-01-01   0.6
1   A2  2021-01-02   37  2021-01-01   0.6
2   A2  2021-01-03   11  2021-01-01   0.6
3   A2  2021-01-04   21  2021-01-01   0.6
4   A2  2021-01-05   29  2021-01-08   0.1
5   A2  2021-01-06   48  2021-01-08   0.1
6   A2  2021-01-07   12  2021-01-08   0.1
7   A2  2021-01-08   40  2021-01-08   0.1
8   A2  2021-01-09   30  2021-01-08   0.1
9   A2  2021-01-10   28  2021-01-08   0.1
10  A2  2021-01-11   41  2021-01-08   0.1
11  A2  2021-01-12   12  2021-01-15   0.9
12  A2  2021-01-13   10  2021-01-15   0.9
13  A2  2021-01-14   10  2021-01-15   0.9
14  A2  2021-01-15   31  2021-01-15   0.9

我将用一个窗口函数替换带有限制的子查询:

df = df_a.join(df_b, on="id")
df = (
df.withColumn(
"rnk",
F.row_number().over(
W.partitionBy("id", "ingest_date").orderBy(
F.abs(F.datediff("ingest_date", "weekly_dt"))
)
),
)
.where("rnk=1")
.drop("rnk")
)
df.show()
+---+-----------+---+----------+-----+                                          
| id|ingest_date|amt| weekly_dt|score|
+---+-----------+---+----------+-----+
| A2| 2021-01-01| 31|2021-01-01|  0.6|
| A2| 2021-01-02| 48|2021-01-01|  0.6|
| A2| 2021-01-03| 47|2021-01-01|  0.6|
| A2| 2021-01-04|  9|2021-01-01|  0.6|
| A2| 2021-01-05| 16|2021-01-08|  0.1|
| A2| 2021-01-06| 44|2021-01-08|  0.1|
| A2| 2021-01-07| 45|2021-01-08|  0.1|
| A2| 2021-01-08| 21|2021-01-08|  0.1|
| A2| 2021-01-09| 36|2021-01-08|  0.1|
| A2| 2021-01-10|  9|2021-01-08|  0.1|
| A2| 2021-01-11| 32|2021-01-08|  0.1|
| A2| 2021-01-12| 10|2021-01-15|  0.9|
| A2| 2021-01-13| 47|2021-01-15|  0.9|
| A2| 2021-01-14| 42|2021-01-15|  0.9|
| A2| 2021-01-15|  1|2021-01-15|  0.9|
| A2| 2021-01-16| 22|2021-01-15|  0.9|
| A2| 2021-01-17| 27|2021-01-15|  0.9|
| A2| 2021-01-18| 49|2021-01-15|  0.9|
| A2| 2021-01-19| 18|2021-01-22|  0.3|
| A2| 2021-01-20| 28|2021-01-22|  0.3|
+---+-----------+---+----------+-----+
only showing top 20 rows

最新更新