Optimize Spark Shuffle Multi Join



需要帮助优化以下多(6)个数据帧之间的多连接场景。是否有任何方法可以优化DF之间的洗牌交换,因为join键在join DF之间是相同的。

final_df = DF1.join(DF2,['ID1','ID2','ID3'],'leftouter')
final_df = final_df.join(DF3,['ID1','ID2','ID3'],'leftouter')
final_df = final_df.join(DF4,['ID1','ID2','ID3'],'leftouter')
final_df = final_df.join(DF5,['ID1','ID2','ID3'],'leftouter')
final_df = final_df.join(DF6,['ID1','ID2','ID3'],'leftouter')

任何帮助都是感激的。由于

我可以看到两种可能的优化在你的情况下,

  1. 如果您的一些数据帧相对较小,则可以广播这些小数据帧,让我们假设DF4和DF6是小的,你可以这样做:

    from pyspark.sql.functions import broadcast
    final_df = DF1.join(DF2,['ID1','ID2','ID3'],'leftouter')
    final_df = final_df.join(DF3,['ID1','ID2','ID3'],'leftouter')
    final_df = final_df.join(broadcast(DF4),['ID1','ID2','ID3'],'leftouter')
    final_df = final_df.join(DF5,['ID1','ID2','ID3'],'leftouter')
    final_df = final_df.join(broadcast(DF6),['ID1','ID2','ID3'],'leftouter')
    

    这可能很棘手,因为小数据框架可能相对根据上下文和集群的大小不同,所以如果不确定,可以单独从其他的,看看是否可以

  2. 在连接时重新划分数据帧总是好的列,在加入之前,一个好地方是当你是读取数据框架,例如:

    DF1 = spark.read.xxx。repartition(80, "ID1", "ID2", "ID3">

    将80替换为所有不同执行器的核数乘以2,3或4,所以如果集群中有20个核心尝试按40,60或80重新分区,并保留给出最好的结果。

在所有这些之后,你可能会遇到特殊的问题,如数据倾斜,非常大的数据帧…,你需要单独处理。

希望有帮助。

相关内容

  • 没有找到相关文章

最新更新