连接两个大表并获取最新值



我想连接两个具有数百万行的Spark数据帧。假设"id"是两个数据帧的公共列。两者都有"日期"列。但是,两个表中的日期可能不匹配。如果第一个表中的记录在第二个表中没有匹配的日期,则对于第二个表格中的"value"列,应采用最近的观测值。因此,我不能在"id"one_answers"date"加入。我在下面创建了一个示例数据帧。在数据量巨大的情况下,执行此操作的最佳方式是什么?

import pandas as pd
a = pd.DataFrame({'id':[1,2,3,1,2,3,1,2,3, 1,2,3], 'date': ['2020-01-01', '2020-01-01', '2020-01-01', '2020-01-08', '2020-01-08', '2020-01-08', '2020-01-21', '2020-01-21', '2020-01-21', '2020-01-31', '2020-01-31', '2020-01-31']})
a = spark.createDataFrame(a)
b = pd.DataFrame({'id':[1,2,3,1,2,1,3,1,2], 'date': ['2019-12-25', '2019-12-25', '2019-12-25', '2020-01-08', '2020-01-08',  '2020-01-21', '2020-01-21', '2020-01-31', '2020-01-31'], 'value': [0.1,0.2,0.3,1,2,10,30,0.1,0.2]})
b = spark.createDataFrame(b)
required_result = pd.DataFrame({'id':[1,2,3,1,2,3,1,2,3, 1,2,3], 'date': ['2020-01-01', '2020-01-01', '2020-01-01', '2020-01-08', '2020-01-08', '2020-01-08', '2020-01-21', '2020-01-21', '2020-01-21', '2020-01-31', '2020-01-31', '2020-01-31'],
'value': [0.1,0.2,0.3, 1,2,0.3,10, 2,30,0.1,0.2, 30]})

您可以在id上加入,并保留第二个数据帧中的日期,这些日期等于或低于第一个数据帧的日期。

data1_sdf.join(data2_sdf.withColumnRenamed('date', 'date_b'), 
[data1_sdf.id == data2_sdf.id, 
data1_sdf.date >= func.col('date_b')], 
'left'
). 
drop(data2_sdf.id). 
withColumn('dates_diff', func.datediff('date_b', 'date')). 
withColumn('max_dtdiff', 
func.max('dates_diff').over(wd.partitionBy('id', 'date'))
). 
filter(func.col('max_dtdiff') == func.col('dates_diff')). 
drop('dates_diff', 'max_dtdiff'). 
orderBy('id', 'date'). 
show()
# +---+----------+----------+-----+
# | id|      date|    date_b|value|
# +---+----------+----------+-----+
# |  1|2020-01-01|2019-12-25|  0.1|
# |  1|2020-01-08|2020-01-08|  1.0|
# |  1|2020-01-21|2020-01-21| 10.0|
# |  1|2020-01-31|2020-01-31|  0.1|
# |  2|2020-01-01|2019-12-25|  0.2|
# |  2|2020-01-08|2020-01-08|  2.0|
# |  2|2020-01-21|2020-01-08|  2.0|
# |  2|2020-01-31|2020-01-31|  0.2|
# |  3|2020-01-01|2019-12-25|  0.3|
# |  3|2020-01-08|2019-12-25|  0.3|
# |  3|2020-01-21|2020-01-21| 30.0|
# |  3|2020-01-31|2020-01-21| 30.0|
# +---+----------+----------+-----+

看起来,您可以只在id上加入,因为这个键看起来分布良好。您可以聚合一点dfb,连接两个dfs,然后过滤并提取具有最大日期的值。

from pyspark.sql import functions as F
b = b.groupBy('id').agg(F.collect_list(F.array('date', 'value')).alias('dv'))
df = a.join(b, 'id', 'left')
df = df.select(
a['*'],
F.array_max(F.filter('dv', lambda x: x[0] <= F.col('date')))[1].alias('value')
)
df.show()
# +---+----------+-----+
# | id|      date|value|
# +---+----------+-----+
# |  1|2020-01-01|  0.1|
# |  1|2020-01-08|  1.0|
# |  3|2020-01-01|  0.3|
# |  3|2020-01-08|  0.3|
# |  2|2020-01-01|  0.2|
# |  2|2020-01-08|  2.0|
# |  1|2020-01-21| 10.0|
# |  1|2020-01-31|  0.1|
# |  3|2020-01-21| 30.0|
# |  3|2020-01-31| 30.0|
# |  2|2020-01-21|  2.0|
# |  2|2020-01-31|  0.2|
# +---+----------+-----+

最新更新