我程序的流量就是这样:
1.从镶木木文件中读取40亿行(〜700GB)的数据到数据框架。使用的分区尺寸为2296
2.清洁并过滤掉25亿行
3.使用管道模型,然后是训练有素的模型,将剩余的15亿行转换。使用逻辑回归模型对模型进行训练,该模型预测数据的0或1和30%的数据被从转换的数据框架中滤除。
4.上面的数据框将外部与〜1 tb的另一个数据集(也从镶木quet文件中读取。)分区大小为4000
5.加入另一个大约100 MB的数据集,例如
joined_data = data1.join(broadcast(small_dataset_100MB), data1.field == small_dataset_100MB.field, "left_outer")
6.然后将上述数据框架爆炸到〜2000
exploded_data = joined_data.withColumn('field', explode('field_list'))
7.执行聚合
aggregate = exploded_data.groupBy(*cols_to_select)
.agg(F.countDistinct(exploded_data.field1).alias('distincts'), F.count("*").alias('count_all'))
cols_to_select
列表中总共有10列。
8.最后一个动作,执行aggregate.count()
。
问题是,第三个最后的计数阶段(200个任务)永远陷入了任务199。尽管分配了4个核心和56个执行者,但计数仅使用一个核心和一个执行人来运行该作业。我尝试将大小从40亿行减少到7亿行(1/6),这花了四个小时。我真的很感谢您在如何加快此过程的帮助下,谢谢
由于将偏斜的数据连接到巨大的数据集,因此操作被粘在最后任务上。加入两个数据范围的钥匙被严重倾斜。目前,通过从数据框架中删除偏斜的数据来解决该问题。如果您必须包含偏斜的数据,则可以使用迭代广播JON(https://github.com/godatadriven/iterative-broadcast-join)。查看此内容丰富的视频以获取更多详细信息https://www.youtube.com/watch?v=6zg7ntw-ktq