我有一个情况,我试图写一些结果使用数据框写入S3使用以下查询与input_table_1大小为13 Gb和input_table_2为1 Mb
input_table_1有account、membership和Input_table_2有列role, id, membership_id, quantity, start_date
SELECT
/*+ BROADCASTJOIN(input_table_2) */
account,
role,
id,
quantity,
cast(start_date AS string) AS start_date
FROM
input_table_1
INNER JOIN
input_table_2
ON array_contains(input_table_1.membership, input_table_2.membership_id)
这个使用Spark数据框架编写的数据集在S3中生成了大约1.1TiB的数据,大约有7000亿条记录。
我们发现存在重复,并使用datafframe .distinct.write.parquet("s3path")来删除重复。记录计数减少到之前总数的1/3,大约有2000亿行,但是我们观察到S3中的输出大小现在是17.2 TiB。
我很困惑这是怎么发生的。
我已经使用了以下spark配置设置
spark.sql.shuffle.partitions = 20000
我已经尝试合并并写入s3,但它不工作。
请建议这是预期的,什么时候可以完成?
这有两个方面:
1) Spark中distinct的物理翻译
Spark催化剂优化器通过ReplaceDeduplicateWithAggregate
规则将distinct
操作转化为聚合(注:在执行计划中,distinct
被命名为Deduplicate
)。
这基本上意味着所有列上的df.distinct()
都被转换为所有列上的groupBy
。df.groupBy(df.columns:_*).agg(Map.empty)
.
Spark在各自的列上为groupBy
变换数据时使用HashPartitioner
。由于groupBy
子句在您的示例中包含所有列(好吧,隐式地,但它确实包含),因此您或多或少地随机将数据转移到集群中的不同节点。
在这种情况下增加spark.sql.shuffle.partitions
是没有用的。
现在看第二面,为什么这对你的拼花文件的大小有这么大的影响?
2)拼花文件压缩
Parquet是一种列格式,它表示你的数据是按列而不是逐行组织的。这允许强大的压缩,如果数据是适当的布局&命令。例如,如果一列在连续的行中包含相同的值,只写一次该值并记录重复的次数就足够了(一种称为run length encoding
的策略)。但是Parquet还使用了各种其他压缩策略。
不幸的是,在您的情况下,在洗牌删除重复项后,数据最终是相当随机的。input_table_1
的原始分区拟合得更好。
解决方案如何解决这个问题没有单一的答案,但我建议接下来做一些提示:
是什么导致了重复?这些能从上游移除吗?还是连接条件有问题导致重复?
一个简单的解决方案是在
distinct
之后重新分区数据集,以匹配输入数据的分区。添加二级排序(sortWithinPartition
)可能会为您提供更好的压缩。然而,这是以额外的洗牌为代价的!- 正如@matt-andruff在下面指出的,您也可以使用
cluster by
在SQL中实现这一点。显然,这还需要将distinct
关键字移动到SQL语句中。
- 正如@matt-andruff在下面指出的,您也可以使用
将您自己的重复数据删除算法写入Spark
Aggregator
,并以有意义的方式将数据分组/shuffle一次。