Spark数据帧不同的写入将输出大小增加了近10倍



我有一个情况,我试图写一些结果使用数据框写入S3使用以下查询与input_table_1大小为13 Gb和input_table_2为1 Mb

input_table_1有account、membership和Input_table_2有列role, id, membership_id, quantity, start_date

SELECT 
/*+ BROADCASTJOIN(input_table_2) */
account,
role,
id,
quantity,
cast(start_date AS string) AS start_date
FROM
input_table_1
INNER JOIN
input_table_2
ON array_contains(input_table_1.membership, input_table_2.membership_id)

这个使用Spark数据框架编写的数据集在S3中生成了大约1.1TiB的数据,大约有7000亿条记录。

我们发现存在重复,并使用datafframe .distinct.write.parquet("s3path")来删除重复。记录计数减少到之前总数的1/3,大约有2000亿行,但是我们观察到S3中的输出大小现在是17.2 TiB。

我很困惑这是怎么发生的。

我已经使用了以下spark配置设置

spark.sql.shuffle.partitions = 20000

我已经尝试合并并写入s3,但它不工作。

请建议这是预期的,什么时候可以完成?

这有两个方面:

1) Spark中distinct的物理翻译

Spark催化剂优化器通过ReplaceDeduplicateWithAggregate规则将distinct操作转化为聚合(注:在执行计划中,distinct被命名为Deduplicate)。

这基本上意味着所有列上的df.distinct()都被转换为所有列上的groupBydf.groupBy(df.columns:_*).agg(Map.empty).

Spark在各自的列上为groupBy变换数据时使用HashPartitioner。由于groupBy子句在您的示例中包含所有列(好吧,隐式地,但它确实包含),因此您或多或少地随机将数据转移到集群中的不同节点。

在这种情况下增加spark.sql.shuffle.partitions是没有用的。

现在看第二面,为什么这对你的拼花文件的大小有这么大的影响?

2)拼花文件压缩

Parquet是一种列格式,它表示你的数据是按列而不是逐行组织的。这允许强大的压缩,如果数据是适当的布局&命令。例如,如果一列在连续的行中包含相同的值,只写一次该值并记录重复的次数就足够了(一种称为run length encoding的策略)。但是Parquet还使用了各种其他压缩策略。

不幸的是,在您的情况下,在洗牌删除重复项后,数据最终是相当随机的。input_table_1的原始分区拟合得更好。

解决方案如何解决这个问题没有单一的答案,但我建议接下来做一些提示:

  • 是什么导致了重复?这些能从上游移除吗?还是连接条件有问题导致重复?

  • 一个简单的解决方案是在distinct之后重新分区数据集,以匹配输入数据的分区。添加二级排序(sortWithinPartition)可能会为您提供更好的压缩。然而,这是以额外的洗牌为代价的!

    • 正如@matt-andruff在下面指出的,您也可以使用cluster by在SQL中实现这一点。显然,这还需要将distinct关键字移动到SQL语句中。
  • 将您自己的重复数据删除算法写入SparkAggregator,并以有意义的方式将数据分组/shuffle一次。

相关内容

  • 没有找到相关文章

最新更新