为什么repartition()方法会增加磁盘上的文件大小



我正在使用的数据湖(df)有2 TB的数据和20000个文件。我想把数据集压缩成2000个1GB的文件。

如果运行df.coalesce(2000)并将其写入磁盘,则数据湖包含1.9 TB的数据。

如果运行df.repartition(2000)并将其写入磁盘,则数据湖包含2.6 TB的数据。

repartition()数据湖中的每个文件都比预期大0.3 GB(它们都是1.3 GB的文件,而不是1 GB的文件)。

为什么repartition()方法会增加整个数据湖的大小?

有一个相关的问题讨论了为什么在运行聚合后数据湖的大小会增加。答案是:

通常,Parquet等列存储格式在数据分布(数据组织)和单个列的基数方面非常敏感。数据越有组织,基数越低,存储的效率就越高。

coalesce()算法是否提供更有组织性的数据。。。我不这么认为…

我认为另一个问题不能回答我的问题。

免责声明

这个答案主要包含推测。对这种现象的详细解释可能需要对输入和输出(或至少它们各自的元数据)进行深入分析。

观察

  1. 熵有效地限制了最强无损压缩的性能-维基百科-熵(信息论)
  2. 持久列格式和内部Spark SQL表示都透明地应用了不同的压缩技术(如运行长度编码或字典编码),以减少存储数据的内存占用。

    此外,磁盘上的格式(包括纯文本数据)可以使用通用压缩算法进行显式压缩——目前尚不清楚是否是这种情况。

  3. 压缩(显式或透明)应用于数据块(通常是分区,但可以使用较小的单元)。

  4. 基于1)、2)和3),我们可以假设平均压缩率将取决于集群中数据的分布。我们还应该注意,如果上游谱系包含广泛的转换,那么最终结果可能是不确定的。

coalescerepartition的可能影响

通常,coalesce可以采用两种路径:

  • 通过管道升级到源——这是最常见的情况
  • 传播到最近的洗牌

在第一种情况下,我们可以预期压缩率将与输入的压缩率相当。然而,在某些情况下,可以实现更小的最终输出。让我们想象一个退化的数据集:

val df = sc.parallelize(
Seq("foo", "foo", "foo", "bar", "bar", "bar"),
6 
).toDF

如果像这样的数据集被写入磁盘,就没有压缩的可能性——每个值都必须按原样写入:

df.withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
|  foo|  0|
|  foo|  1|
|  foo|  2|
|  bar|  3|
|  bar|  4|
|  bar|  5|
+-----+---+

换句话说,我们大约需要6*3个字节,总共需要18个字节。

然而,如果我们联合

df.coalesce(2).withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
|  foo|  0|
|  foo|  0|
|  foo|  0|
|  bar|  1|
|  bar|  1|
|  bar|  1|
+-----+---+

例如,我们可以应用以小int为计数的RLE,并存储每个分区3+1个字节,总共给出8个字节。

这当然是一个巨大的过于简单化,但表明了保留低熵输入结构和合并块可以减少内存占用。

第二种coalesce场景不太明显,但在某些场景中,上游过程可以减少熵(例如,考虑窗口函数),保留这样的结构将是有益的。

repartition怎么样?

在没有分区表达式的情况下,repartition应用RoundRobinPartitioning(实现为具有基于分区id的伪随机密钥的HashPartitioning)。只要散列函数的行为合理,这种重新分布就应该最大化数据的熵,从而降低可能的压缩率。

结论

coalesce不应该单独提供任何特定的好处,但可以保留数据分发的现有属性——在某些情况下,这种属性可能是有利的。

repartition,由于其性质,平均来说会使情况变得更糟,除非数据的熵已经最大化(情况可能会有所改善,但在非平凡的数据集上极不可能)。

最后,具有划分表达式的repartitionrepartitionByRange应该降低熵,提高压缩率。

注意

我们还应该记住,柱状格式通常基于运行时统计信息来决定特定的压缩/编码方法(或者没有)。因此,即使特定块中的行集是固定的,但行的顺序发生了变化,我们也可以观察到不同的结果。

我同意@10465355的回答。这里我有一个极端的例子。

数据处理

有一个表叫table_a。它的所有列都是字符串。其存储格式为Orc,由生成

insert overwrite table table_a
select a,b,...,i
from table_other
group by a,b,...,i

经过HashAggregate操作后,table_a中的数据得到了足够的组织。特别是第一列CCD_ 19。orc文件大小为6.97 MB(事实上,有一个2.09 KB的小文件,我后来忽略了它。)

然后,我们repartitiontable_a。

val querydf = spark.sql("""select *
from table_a distribute by rand()""").repartition(1)
querydf.createOrReplaceTempView("tmpTable")
spark.sql("""insert overwrite table table_a 
select a,b,...,i
from tmpTable""")

numpartitions=1时,Random(hashing.byteswap32(index)).nextInt(numPartitions)不触发随机再分配。因此,我们将distribute by rand()添加为等效于repartition(n),并得到一个大小为14.26MB的文件。

结果

我们可以使用hive --orcfiledump来获得orc文件的文件结构。

repartition:之前

Stripes:
Stripe: offset: 3 data: 7288854 rows: 668265 tail: 354 index: 13637
Stream: column 0 section ROW_INDEX start: 3 length 50
Stream: column 1 section ROW_INDEX start: 53 length 1706
Stream: column 2 section ROW_INDEX start: 1759 length 672
Stream: column 3 section ROW_INDEX start: 2431 length 2297
Stream: column 4 section ROW_INDEX start: 4728 length 1638
Stream: column 5 section ROW_INDEX start: 6366 length 1270
Stream: column 6 section ROW_INDEX start: 7636 length 1887
Stream: column 7 section ROW_INDEX start: 9523 length 1823
Stream: column 8 section ROW_INDEX start: 11346 length 1120
Stream: column 9 section ROW_INDEX start: 12466 length 1174
Stream: column 1 section DATA start: 13640 length 209662
Stream: column 1 section LENGTH start: 223302 length 1158
Stream: column 1 section DICTIONARY_DATA start: 224460 length 231328
Stream: column 2 section DATA start: 455788 length 29861
Stream: column 2 section LENGTH start: 485649 length 5
Stream: column 2 section DICTIONARY_DATA start: 485654 length 33
Stream: column 3 section DATA start: 485687 length 424936
Stream: column 3 section LENGTH start: 910623 length 4069
Stream: column 3 section DICTIONARY_DATA start: 914692 length 41298
Stream: column 4 section DATA start: 955990 length 443602
Stream: column 4 section LENGTH start: 1399592 length 4122
Stream: column 4 section DICTIONARY_DATA start: 1403714 length 56217
Stream: column 5 section DATA start: 1459931 length 475983
Stream: column 5 section LENGTH start: 1935914 length 2650
Stream: column 5 section DICTIONARY_DATA start: 1938564 length 17798
Stream: column 6 section DATA start: 1956362 length 480891
Stream: column 6 section LENGTH start: 2437253 length 4230
Stream: column 6 section DICTIONARY_DATA start: 2441483 length 27873
Stream: column 7 section DATA start: 2469356 length 2716359
Stream: column 7 section LENGTH start: 5185715 length 304679
Stream: column 8 section DATA start: 5490394 length 438723
Stream: column 8 section LENGTH start: 5929117 length 58072
Stream: column 8 section DICTIONARY_DATA start: 5987189 length 424961
Stream: column 9 section DATA start: 6412150 length 630248
Stream: column 9 section LENGTH start: 7042398 length 1455
Stream: column 9 section DICTIONARY_DATA start: 7043853 length 258641
Encoding column 0: DIRECT
Encoding column 1: DICTIONARY_V2[48184]
Encoding column 2: DICTIONARY_V2[3]
Encoding column 3: DICTIONARY_V2[4252]
Encoding column 4: DICTIONARY_V2[4398]
Encoding column 5: DICTIONARY_V2[4404]
Encoding column 6: DICTIONARY_V2[5553]
Encoding column 7: DIRECT_V2
Encoding column 8: DICTIONARY_V2[105667]
Encoding column 9: DICTIONARY_V2[60943]

重新分区后:

Stripes:
Stripe: offset: 3 data: 14940022 rows: 668284 tail: 344 index: 12312
Stream: column 0 section ROW_INDEX start: 3 length 50
Stream: column 1 section ROW_INDEX start: 53 length 1755
Stream: column 2 section ROW_INDEX start: 1808 length 678
Stream: column 3 section ROW_INDEX start: 2486 length 1815
Stream: column 4 section ROW_INDEX start: 4301 length 1297
Stream: column 5 section ROW_INDEX start: 5598 length 1217
Stream: column 6 section ROW_INDEX start: 6815 length 1841
Stream: column 7 section ROW_INDEX start: 8656 length 1330
Stream: column 8 section ROW_INDEX start: 9986 length 1289
Stream: column 9 section ROW_INDEX start: 11275 length 1040
Stream: column 1 section DATA start: 12315 length 4260547
Stream: column 1 section LENGTH start: 4272862 length 15955
Stream: column 2 section DATA start: 4288817 length 102153
Stream: column 2 section LENGTH start: 4390970 length 5
Stream: column 2 section DICTIONARY_DATA start: 4390975 length 33
Stream: column 3 section DATA start: 4391008 length 1033345
Stream: column 3 section LENGTH start: 5424353 length 4069
Stream: column 3 section DICTIONARY_DATA start: 5428422 length 41298
Stream: column 4 section DATA start: 5469720 length 1044769
Stream: column 4 section LENGTH start: 6514489 length 4122
Stream: column 4 section DICTIONARY_DATA start: 6518611 length 56217
Stream: column 5 section DATA start: 6574828 length 1142805
Stream: column 5 section LENGTH start: 7717633 length 2650
Stream: column 5 section DICTIONARY_DATA start: 7720283 length 17798
Stream: column 6 section DATA start: 7738081 length 1147888
Stream: column 6 section LENGTH start: 8885969 length 4230
Stream: column 6 section DICTIONARY_DATA start: 8890199 length 27873
Stream: column 7 section DATA start: 8918072 length 1705640
Stream: column 7 section LENGTH start: 10623712 length 208184
Stream: column 7 section DICTIONARY_DATA start: 10831896 length 1525605
Stream: column 8 section DATA start: 12357501 length 513225
Stream: column 8 section LENGTH start: 12870726 length 58100
Stream: column 8 section DICTIONARY_DATA start: 12928826 length 424905
Stream: column 9 section DATA start: 13353731 length 1338510
Stream: column 9 section LENGTH start: 14692241 length 1455
Stream: column 9 section DICTIONARY_DATA start: 14693696 length 258641
Encoding column 0: DIRECT
Encoding column 1: DIRECT_V2
Encoding column 2: DICTIONARY_V2[3]
Encoding column 3: DICTIONARY_V2[4252]
Encoding column 4: DICTIONARY_V2[4398]
Encoding column 5: DICTIONARY_V2[4404]
Encoding column 6: DICTIONARY_V2[5553]
Encoding column 7: DICTIONARY_V2[378283]
Encoding column 8: DICTIONARY_V2[105678]
Encoding column 9: DICTIONARY_V2[60943]

兽人同时使用游程编码和字典编码来压缩数据。以下是编码DICTIONARY_V2的含义。REF:ORCv1
|编码|流类型|可选|内容||--------|---------------|--------|------------||DICTIONARY_V2|PRESENT|Yes |布尔RLE|||DATA|No |无符号整数RLE v2|||DICTIONARY_DATA|No |字符串内容||| LENGTH |否|无符号整数RLE v2 |

在字典编码中,如果值为["Nevada"、"California"、"内华达"、"加利福尼亚"one_answers"Florida"];DICTIONARY_DATA将为"CaliforniaFloridaNevada",LENGTH将为[10,7,6]。DATA将是[2,0,2,0,1]。

并且Unsigned Integer RLE v2也在REF:ORCv1 中

在单元0.12中,ORC引入了运行长度编码版本2(RLEv2),该版本改进了压缩和固定位宽编码,以实现更快的扩展。RLEv2基于数据使用四个子编码:

  • 短重复-用于具有重复值的短序列
  • 直接-用于固定位宽的随机序列
  • Patched Base-用于可变位宽的随机序列
  • Delta-用于单调递增或递减序列

让我们关注第一列。

# before repartition
Stream: column 1 section DATA start: 13640 length 209662
Stream: column 1 section LENGTH start: 223302 length 1158
Stream: column 1 section DICTIONARY_DATA start: 224460 length 231328
Encoding column 1: DICTIONARY_V2[48184]
# after repartition
Stream: column 1 section DATA start: 12315 length 4260547
Stream: column 1 section LENGTH start: 4272862 length 15955
Encoding column 1: DIRECT_V2

虽然我不知道Orc是如何选择ENCODING的,但Orc认为在随机化后对列1使用DIRECT_V2比使用DICTIONARY_V2节省更多空间。事实上,在重新划分后,空间会变大近10倍。(4260547+15955)/(209662+1158+231328)

其他列的大多数ENCODING没有改变,但大小有所增加。

比较

repartitionVScoalesce:
前者的文件大小是一致的,以避免数据偏斜
前者的数据大小变大
*(潜在)*ORC的行组索引在过滤混沌数据时不能使用。加入时,双方都需要重新洗牌。我用上面的数据来测试洗牌和排序之间的时间没有显著差异

最新更新