Spark是否支持对S3中的拼花文件进行真正的列扫描?



Parquet数据存储格式的一大优点是它是列式的。如果我有一个具有数百列的"宽"数据集,但我的查询只涉及其中的几个,那么可能只读取存储这几个列的数据,并跳过其余的。

这个特性的工作原理大概是:在指明每一列在文件系统上的位置的parquet文件的开头读取一些元数据。然后,读取器可以在磁盘上查找,只读取必要的列。

有谁知道spark的默认拼字阅读器是否正确地在S3上实现了这种选择性查找?我认为S3支持它,但是理论支持和正确利用这种支持的实现之间存在很大的差异。

需要分解

  1. Parquet代码是否从spark获得谓词(yes)
  2. parquet然后尝试选择性地只读那些列,使用Hadoop FileSystem seek() + read()readFully(position, buffer, length)调用?是的
  3. S3连接器是否将这些文件操作转换为有效的HTTP GET请求?在亚马逊电子病历中:是的。在Apache Hadoop中,你需要在类路径上设置Hadoop 2.8,并设置正确的spark.hadoop.fs.s3a.experimental.fadvise=random来触发随机访问。

Hadoop 2.7和更早的版本对文件的主动seek()处理得很糟糕,因为它们总是启动一个GET offset-end- file,对下一个seek感到惊讶,不得不中止该连接,重新打开一个新的TCP/HTTPS 1.1连接(慢,CPU重),重复这样做。随机IO操作不利于批量加载。csv.gz之类的东西,但对于获得ORC/Parquet性能至关重要。

在Hadoop 2.7的Hadoop -aws JAR中没有得到加速。如果你需要它,你需要更新hadoop*.jar和依赖项,或者在hadoop 2.8上从头开始构建Spark

注意Hadoop 2.8+还有一个很好的小特性:如果在日志语句中调用S3A文件系统客户端上的toString(),它会打印出所有的文件系统IO统计数据,包括在seek中丢弃的数据量,终止的TCP连接等。帮你弄清楚发生了什么

2018-04-13警告::不要试图将Hadoop 2.8+ hadoop-aws JAR与Hadoop -2.7 JAR集的其余部分一起放在类路径上,并期望看到任何加速。您将看到的只是堆栈跟踪。您需要更新所有hadoop jar及其传递依赖项。

免责声明:我没有一个明确的答案,也不想作为一个权威的来源,但是我花了一些时间在Spark 2.2+中的镶木地板支持上,我希望我的答案可以帮助我们所有人更接近正确的答案。


Parquet在S3上是否避免从S3中提取未使用的列的数据,只检索它需要的文件块,还是提取整个文件?

我使用Spark 2.3.0-SNAPSHOT,我今天从主构建的。

parquet数据源格式由ParquetFileFormat处理,ParquetFileFormat是一种FileFormat。

如果我是正确的,读取部分是由buildReaderWithPartitionValues方法(覆盖FileFormat)处理的。

buildReaderWithPartitionValues仅在FileSourceScanExec物理运算符被请求用于所谓的输入RDD时使用,而这些RDD实际上是一个单独的RDD,当WholeStageCodegenExec执行时生成内部行。

话虽如此,我认为回顾buildReaderWithPartitionValues所做的可能会让我们更接近最终答案。

当你看这一行时,你可以确信我们在正确的轨道上。

//当过滤器下推开启时,尝试下推过滤器

该代码路径依赖于默认开启的spark.sql.parquet.filterPushdown Spark属性。

spark.sql.parquet。filterPushdown当设置为true时,使Parquet过滤器下推优化。

这就引出了parquet-hadoop的ParquetInputFormat。setFilterPredicate 如果过滤器已经定义。

if (pushed.isDefined) {
  ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
当代码回落到parquet-mr(而不是使用所谓的向量化parquet解码阅读器)时,使用过滤器时,代码变得更有趣。这是我不太明白的部分(除了我在代码中看到的部分)。

请注意,向量化拼花解码阅读器是由spark.sql.parquet.enableVectorizedReader Spark属性控制的,该属性默认是打开的。

提示:要知道使用了if表达式的哪个部分,请为org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat记录器启用DEBUG日志级别。

为了查看所有下推过滤器,您可以打开org.apache.spark.sql.execution.FileSourceScanExec记录器的INFO日志级别。您应该在日志中看到以下内容:

INFO Pushed Filters: [pushedDownFilters]

我确实希望,如果它不是接近一个明确的答案,它有一点帮助,有人捡起我离开的地方,使它很快。最后的希望:)

spark的拼花阅读器就像任何其他InputFormat,

  1. 没有任何的输入格式有任何特殊的S3。输入格式可以从LocalFileSystem、Hdfs和S3读取,无需为此进行特殊优化。

  2. Parquet InpuTFormat将根据您请求的列选择性地为您读取列。

  3. 如果您想要完全确定(虽然下推谓词在最新的spark版本中工作)手动选择列并编写转换和操作,而不是依赖于SQL

不完全支持谓词下推。当然,这取决于:

  • 具体用例
  • <
  • 火花版本/gh>
  • S3连接器类型和版本

为了检查您的特定用例,您可以在Spark中启用DEBUG日志级别,并运行您的查询。然后,您可以看到在S3 (HTTP)请求期间是否存在"查找",以及实际发送了多少请求。像这样:

17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1[r][n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes 0-7472093/7472094[r][n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: 7472094[r][n]"

下面是最近打开的一个问题报告的例子,由于Spark 2.1无法根据Parquet文件中存储的元数据计算数据集中所有行的COUNT(*): https://issues.apache.org/jira/browse/SPARK-21074

相关内容

  • 没有找到相关文章

最新更新