我想连接两个具有数百万行的Spark数据帧。假设"id"是两个数据帧的公共列。两者都有"日期"列。但是,两个表中的日期可能不匹配。如果第一个表中的记录在第二个表中没有匹配的日期,则对于第二个表格中的"value"列,应采用最近的观测值。因此,我不能在"id"one_answers"date"加入。我在下面创建了一个示例数据帧。在数据量巨大的情况下,执行此操作的最佳方式是什么?
import pandas as pd
a = pd.DataFrame({'id':[1,2,3,1,2,3,1,2,3, 1,2,3], 'date': ['2020-01-01', '2020-01-01', '2020-01-01', '2020-01-08', '2020-01-08', '2020-01-08', '2020-01-21', '2020-01-21', '2020-01-21', '2020-01-31', '2020-01-31', '2020-01-31']})
a = spark.createDataFrame(a)
b = pd.DataFrame({'id':[1,2,3,1,2,1,3,1,2], 'date': ['2019-12-25', '2019-12-25', '2019-12-25', '2020-01-08', '2020-01-08', '2020-01-21', '2020-01-21', '2020-01-31', '2020-01-31'], 'value': [0.1,0.2,0.3,1,2,10,30,0.1,0.2]})
b = spark.createDataFrame(b)
required_result = pd.DataFrame({'id':[1,2,3,1,2,3,1,2,3, 1,2,3], 'date': ['2020-01-01', '2020-01-01', '2020-01-01', '2020-01-08', '2020-01-08', '2020-01-08', '2020-01-21', '2020-01-21', '2020-01-21', '2020-01-31', '2020-01-31', '2020-01-31'],
'value': [0.1,0.2,0.3, 1,2,0.3,10, 2,30,0.1,0.2, 30]})
您可以在id
上加入,并保留第二个数据帧中的日期,这些日期等于或低于第一个数据帧的日期。
data1_sdf.join(data2_sdf.withColumnRenamed('date', 'date_b'),
[data1_sdf.id == data2_sdf.id,
data1_sdf.date >= func.col('date_b')],
'left'
).
drop(data2_sdf.id).
withColumn('dates_diff', func.datediff('date_b', 'date')).
withColumn('max_dtdiff',
func.max('dates_diff').over(wd.partitionBy('id', 'date'))
).
filter(func.col('max_dtdiff') == func.col('dates_diff')).
drop('dates_diff', 'max_dtdiff').
orderBy('id', 'date').
show()
# +---+----------+----------+-----+
# | id| date| date_b|value|
# +---+----------+----------+-----+
# | 1|2020-01-01|2019-12-25| 0.1|
# | 1|2020-01-08|2020-01-08| 1.0|
# | 1|2020-01-21|2020-01-21| 10.0|
# | 1|2020-01-31|2020-01-31| 0.1|
# | 2|2020-01-01|2019-12-25| 0.2|
# | 2|2020-01-08|2020-01-08| 2.0|
# | 2|2020-01-21|2020-01-08| 2.0|
# | 2|2020-01-31|2020-01-31| 0.2|
# | 3|2020-01-01|2019-12-25| 0.3|
# | 3|2020-01-08|2019-12-25| 0.3|
# | 3|2020-01-21|2020-01-21| 30.0|
# | 3|2020-01-31|2020-01-21| 30.0|
# +---+----------+----------+-----+
看起来,您可以只在id
上加入,因为这个键看起来分布良好。您可以聚合一点dfb
,连接两个dfs,然后过滤并提取具有最大日期的值。
from pyspark.sql import functions as F
b = b.groupBy('id').agg(F.collect_list(F.array('date', 'value')).alias('dv'))
df = a.join(b, 'id', 'left')
df = df.select(
a['*'],
F.array_max(F.filter('dv', lambda x: x[0] <= F.col('date')))[1].alias('value')
)
df.show()
# +---+----------+-----+
# | id| date|value|
# +---+----------+-----+
# | 1|2020-01-01| 0.1|
# | 1|2020-01-08| 1.0|
# | 3|2020-01-01| 0.3|
# | 3|2020-01-08| 0.3|
# | 2|2020-01-01| 0.2|
# | 2|2020-01-08| 2.0|
# | 1|2020-01-21| 10.0|
# | 1|2020-01-31| 0.1|
# | 3|2020-01-21| 30.0|
# | 3|2020-01-31| 30.0|
# | 2|2020-01-21| 2.0|
# | 2|2020-01-31| 0.2|
# +---+----------+-----+