需要帮助优化以下多(6)个数据帧之间的多连接场景。是否有任何方法可以优化DF之间的洗牌交换,因为join键在join DF之间是相同的。
final_df = DF1.join(DF2,['ID1','ID2','ID3'],'leftouter')
final_df = final_df.join(DF3,['ID1','ID2','ID3'],'leftouter')
final_df = final_df.join(DF4,['ID1','ID2','ID3'],'leftouter')
final_df = final_df.join(DF5,['ID1','ID2','ID3'],'leftouter')
final_df = final_df.join(DF6,['ID1','ID2','ID3'],'leftouter')
任何帮助都是感激的。由于
我可以看到两种可能的优化在你的情况下,
-
如果您的一些数据帧相对较小,则可以广播这些小数据帧,让我们假设DF4和DF6是小的,你可以这样做:
from pyspark.sql.functions import broadcast final_df = DF1.join(DF2,['ID1','ID2','ID3'],'leftouter') final_df = final_df.join(DF3,['ID1','ID2','ID3'],'leftouter') final_df = final_df.join(broadcast(DF4),['ID1','ID2','ID3'],'leftouter') final_df = final_df.join(DF5,['ID1','ID2','ID3'],'leftouter') final_df = final_df.join(broadcast(DF6),['ID1','ID2','ID3'],'leftouter')
这可能很棘手,因为小数据框架可能相对根据上下文和集群的大小不同,所以如果不确定,可以单独从其他的,看看是否可以
-
在连接时重新划分数据帧总是好的列,在加入之前,一个好地方是当你是读取数据框架,例如:
DF1 = spark.read.xxx。repartition(80, "ID1", "ID2", "ID3">
将80替换为所有不同执行器的核数乘以2,3或4,所以如果集群中有20个核心尝试按40,60或80重新分区,并保留给出最好的结果。
在所有这些之后,你可能会遇到特殊的问题,如数据倾斜,非常大的数据帧…,你需要单独处理。
希望有帮助。