我下面有两个表,我试图基于ID和基于ingest_date
列的最接近的可用weekly_dt
日期加入。
AnalysisException: Correlated scalar subqueries must be aggregated: GlobalLimit 1
设置import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
np.random.seed(25)
A1 = [('A1', i.date(), np.random.randint(0,50)) for i in pd.date_range('01 Jan 2021', '21 Jan 2021',freq='D')]
A2 = [('A2', i.date(), np.random.randint(0,50)) for i in pd.date_range('01 Jan 2021', '21 Jan 2021',freq='D')]
df_a = spark.createDataFrame(A1 + A2, ['id','ingest_date','amt'])
weekly_scores = [
('A1', pd.Timestamp('01 Jan 2021').date(), '0.5'),
('A1', pd.Timestamp('08 Jan 2021').date(), '0.3'),
('A1', pd.Timestamp('15 Jan 2021').date(), '0.8'),
('A1', pd.Timestamp('22 Jan 2021').date(), '0.6'),
('A2', pd.Timestamp('01 Jan 2021').date(), '0.6'),
('A2', pd.Timestamp('08 Jan 2021').date(), '0.1'),
('A2', pd.Timestamp('15 Jan 2021').date(), '0.9'),
('A2', pd.Timestamp('22 Jan 2021').date(), '0.3'),
]
df_b = spark.createDataFrame(weekly_scores, ['id','weekly_dt','score'])
表df_a.show()
+---+-----------+---+
| id|ingest_date|amt|
+---+-----------+---+
| A1| 2021-01-01| 26|
| A1| 2021-01-02| 1|
| A1| 2021-01-03| 0|
| A1| 2021-01-04| 31|
| A1| 2021-01-05| 41|
| A1| 2021-01-06| 46|
| A1| 2021-01-07| 11|
| A1| 2021-01-08| 0|
| A1| 2021-01-09| 14|
| A1| 2021-01-10| 5|
| A1| 2021-01-11| 0|
| A1| 2021-01-12| 35|
| A1| 2021-01-13| 5|
| A1| 2021-01-14| 43|
| A1| 2021-01-15| 18|
| A1| 2021-01-16| 31|
| A1| 2021-01-17| 44|
| A1| 2021-01-18| 25|
| A1| 2021-01-19| 47|
| A1| 2021-01-20| 36|
+---+-----------+---+
df_b.show()
+---+----------+-----+
| id| weekly_dt|score|
+---+----------+-----+
| A1|2021-01-01| 0.5|
| A1|2021-01-08| 0.3|
| A1|2021-01-15| 0.8|
| A1|2021-01-22| 0.6|
| A2|2021-01-01| 0.6|
| A2|2021-01-08| 0.1|
| A2|2021-01-15| 0.9|
| A2|2021-01-22| 0.3|
+---+----------+-----+
预期输出。
id ingest_date amt weekly_dt score
0 A1 2021-01-01 26 2021-01-01 0.5
4 A1 2021-01-02 1 2021-01-01 0.5
8 A1 2021-01-03 0 2021-01-01 0.5
12 A1 2021-01-04 31 2021-01-01 0.5
17 A1 2021-01-05 41 2021-01-08 0.3
21 A1 2021-01-06 46 2021-01-08 0.3
25 A1 2021-01-07 11 2021-01-08 0.3
29 A1 2021-01-08 0 2021-01-08 0.3
33 A1 2021-01-09 14 2021-01-08 0.3
37 A1 2021-01-10 5 2021-01-08 0.3
41 A1 2021-01-11 0 2021-01-08 0.3
46 A1 2021-01-12 35 2021-01-15 0.8
50 A1 2021-01-13 5 2021-01-15 0.8
54 A1 2021-01-14 43 2021-01-15 0.8
58 A1 2021-01-15 18 2021-01-15 0.8
62 A1 2021-01-16 31 2021-01-15 0.8
66 A1 2021-01-17 44 2021-01-15 0.8
70 A1 2021-01-18 25 2021-01-15 0.8
75 A1 2021-01-19 47 2021-01-22 0.6
79 A1 2021-01-20 36 2021-01-22 0.6
83 A1 2021-01-21 43 2021-01-22 0.6
84 A2 2021-01-01 32 2021-01-01 0.6
88 A2 2021-01-02 37 2021-01-01 0.6
92 A2 2021-01-03 11 2021-01-01 0.6
96 A2 2021-01-04 21 2021-01-01 0.6
101 A2 2021-01-05 29 2021-01-08 0.1
105 A2 2021-01-06 48 2021-01-08 0.1
109 A2 2021-01-07 12 2021-01-08 0.1
113 A2 2021-01-08 40 2021-01-08 0.1
117 A2 2021-01-09 30 2021-01-08 0.1
121 A2 2021-01-10 28 2021-01-08 0.1
125 A2 2021-01-11 41 2021-01-08 0.1
130 A2 2021-01-12 12 2021-01-15 0.9
134 A2 2021-01-13 10 2021-01-15 0.9
138 A2 2021-01-14 10 2021-01-15 0.9
142 A2 2021-01-15 31 2021-01-15 0.9
146 A2 2021-01-16 13 2021-01-15 0.9
150 A2 2021-01-17 31 2021-01-15 0.9
154 A2 2021-01-18 11 2021-01-15 0.9
159 A2 2021-01-19 15 2021-01-22 0.3
163 A2 2021-01-20 18 2021-01-22 0.3
167 A2 2021-01-21 49 2021-01-22 0.3
Spark Query
SELECT
a.id,
a.ingestion_date,
a.amt,
b.weekly_dt,
b.score
FROM a
LEFT JOIN b
ON a.id = b.id
AND a.ingestion_date =
(
SELECT weekly_dt
FROM b
WHERE id = b.id
ORDER BY DATEDIFF(a.ingest_date, weekly_dt) ASC
LIMIT 1
)
编辑,
我知道我可以创建一个窗口并使用dense_rank()
来订购结果,但我想知道这是否是最好的方法?
from pyspark.sql import Window
s = spark.sql("""
SELECT
a.id,
a.ingest_date,
a.amt,
b.weekly_dt,
b.score
FROM a
LEFT JOIN b
ON b.id = a.id
""").withColumn('delta',
F.abs(F.datediff(F.col('ingest_date'), F.col('weekly_dt')
)
)
)
s.withColumn('t',
F.dense_rank().over(
Window.partitionBy('id','ingest_date').orderBy(F.asc('delta')))
).filter('t == 1').drop('t','delta').show()
id ingest_date amt weekly_dt score
0 A2 2021-01-01 32 2021-01-01 0.6
1 A2 2021-01-02 37 2021-01-01 0.6
2 A2 2021-01-03 11 2021-01-01 0.6
3 A2 2021-01-04 21 2021-01-01 0.6
4 A2 2021-01-05 29 2021-01-08 0.1
5 A2 2021-01-06 48 2021-01-08 0.1
6 A2 2021-01-07 12 2021-01-08 0.1
7 A2 2021-01-08 40 2021-01-08 0.1
8 A2 2021-01-09 30 2021-01-08 0.1
9 A2 2021-01-10 28 2021-01-08 0.1
10 A2 2021-01-11 41 2021-01-08 0.1
11 A2 2021-01-12 12 2021-01-15 0.9
12 A2 2021-01-13 10 2021-01-15 0.9
13 A2 2021-01-14 10 2021-01-15 0.9
14 A2 2021-01-15 31 2021-01-15 0.9
我将用一个窗口函数替换带有限制的子查询:
df = df_a.join(df_b, on="id")
df = (
df.withColumn(
"rnk",
F.row_number().over(
W.partitionBy("id", "ingest_date").orderBy(
F.abs(F.datediff("ingest_date", "weekly_dt"))
)
),
)
.where("rnk=1")
.drop("rnk")
)
df.show()
+---+-----------+---+----------+-----+
| id|ingest_date|amt| weekly_dt|score|
+---+-----------+---+----------+-----+
| A2| 2021-01-01| 31|2021-01-01| 0.6|
| A2| 2021-01-02| 48|2021-01-01| 0.6|
| A2| 2021-01-03| 47|2021-01-01| 0.6|
| A2| 2021-01-04| 9|2021-01-01| 0.6|
| A2| 2021-01-05| 16|2021-01-08| 0.1|
| A2| 2021-01-06| 44|2021-01-08| 0.1|
| A2| 2021-01-07| 45|2021-01-08| 0.1|
| A2| 2021-01-08| 21|2021-01-08| 0.1|
| A2| 2021-01-09| 36|2021-01-08| 0.1|
| A2| 2021-01-10| 9|2021-01-08| 0.1|
| A2| 2021-01-11| 32|2021-01-08| 0.1|
| A2| 2021-01-12| 10|2021-01-15| 0.9|
| A2| 2021-01-13| 47|2021-01-15| 0.9|
| A2| 2021-01-14| 42|2021-01-15| 0.9|
| A2| 2021-01-15| 1|2021-01-15| 0.9|
| A2| 2021-01-16| 22|2021-01-15| 0.9|
| A2| 2021-01-17| 27|2021-01-15| 0.9|
| A2| 2021-01-18| 49|2021-01-15| 0.9|
| A2| 2021-01-19| 18|2021-01-22| 0.3|
| A2| 2021-01-20| 28|2021-01-22| 0.3|
+---+-----------+---+----------+-----+
only showing top 20 rows