我有两种情况,我23 GB
parquet
数据分区,并提前读取少量columns
和caching
数据,以便稍后触发一系列后续查询。
设置:
- 集群:12 节点 EMR
- 火花版本:1.6
- Spark 配置:默认
- 运行配置:两种情况相同
案例1:
val paths = Array("s3://my/parquet/path", ...)
val parqFile = sqlContext.read.parquet(paths:_*)
parqFile.registerTempTable("productViewBase")
val dfMain = sqlContext.sql("select guid,email,eventKey,timestamp,pogId from productViewBase")
dfMain.cache.count
从SparkUI
,输入数据读取为6.2 GB,缓存对象为15.1 GB。
案例1:
val paths = Array("s3://my/parquet/path", ...)
val parqFile = sqlContext.read.parquet(paths:_*)
parqFile.registerTempTable("productViewBase")
val dfMain = sqlContext.sql("select guid,email,eventKey,timestamp,pogId from productViewBase order by pogId")
dfMain.cache.count
从SparkUI
,输入数据读取是6.2 GB,缓存对象是5.5 GB。
对此行为的任何解释或代码引用?
它实际上相对简单。正如您可以在 SQL 指南中读到的:
Spark SQL可以使用内存中的列格式缓存表...Spark SQL 将仅扫描必需的列,并自动调整压缩
排序列式存储的好处是,它很容易压缩典型数据。当您排序时,您会得到这些相似记录的块,甚至可以使用非常简单的技术(如RLE(将它们挤压在一起。
这是一个实际上在具有列式存储的数据库中经常使用的属性,因为它不仅在存储方面非常有效,而且在聚合方面也非常有效。
sql.execution.columnar.compression
包涵盖了 Spark 列式压缩的不同方面,正如您所看到的RunLengthEncoding
确实是可用的压缩方案之一。
所以这里有两部分:
-
Spark可以根据统计数据动态调整压缩方法:
Spark SQL将根据数据的统计信息自动为每列选择压缩编解码器。
-
排序可以将相似的记录聚集在一起,从而使压缩更加高效。
如果列之间存在一些相关性(当情况并非如此时?(,即使是基于单个列的简单排序也会产生相对较大的影响并提高不同压缩方案的性能。