我正在阅读parquet数据,我看到它列出了驱动程序侧的所有目录
Listing s3://xxxx/defloc/warehouse/products_parquet_151/month=2016-01 on driver
Listing s3://xxxx/defloc/warehouse/products_parquet_151/month=2014-12 on driver
我在where子句中指定了month=2014-12。我试过使用spark sql和数据框架API,看起来都没有修剪分区。
使用Dataframe APIdf.filter("month='2014-12'").show()
使用Spark SQL
sqlContext.sql("select name, price from products_parquet_151 where month = '2014-12'")
我已经在1.5.1,1.6.1和2.0.0版本上尝试了上面的
Spark需要首先在驱动程序中加载分区元数据,以知道分区是否存在。Spark将查询该目录以查找现有分区,以确定在扫描数据期间是否可以修剪该分区。
我已经在Spark 2.0上进行了测试,您可以在日志消息中看到。
16/10/14 17:23:37 TRACE ListingFileCatalog: Listing s3a://mybucket/reddit_year on driver
16/10/14 17:23:37 TRACE ListingFileCatalog: Listing s3a://mybucket/reddit_year/year=2007 on driver
这并不意味着我们扫描每个分区中的文件,但是Spark将存储分区的位置,以便将来在表上查询。
您可以看到它实际上正在传递分区过滤器来修剪数据的日志:
16/10/14 17:23:48 TRACE ListingFileCatalog: Partition spec: PartitionSpec(StructType(StructField(year,IntegerType,true)),ArrayBuffer(PartitionDirectory([2012],s3a://mybucket/reddit_year/year=2012), PartitionDirectory([2010],s3a://mybucket/reddit_year/year=2010), ...PartitionDirectory([2015],s3a://mybucket/reddit_year/year=2015), PartitionDirectory([2011],s3a://mybucket/reddit_year/year=2011)))
16/10/14 17:23:48 INFO ListingFileCatalog: Selected 1 partitions out of 9, pruned 88.88888888888889% partitions.
如果你在查询中运行explain(True)
,你可以在逻辑计划中看到这一点:spark.sql("select created_utc, score, name from reddit where year = '2014'").explain(True)
这将显示计划,您可以看到它正在计划的底部进行过滤:
+- *BatchedScan parquet [created_utc#58,name#65,score#69L,year#74] Format: ParquetFormat, InputPaths: s3a://mybucket/reddit_year, PartitionFilters: [isnotnull(year#74), (cast(year#74 as double) = 2014.0)], PushedFilters: [], ReadSchema: struct<created_utc:string,name:string,score:bigint>
Spark有机会在通过Hive时改进其分区修剪;看到火花- 17179。
如果您只是直接访问对象存储,那么问题是针对对象存储的递归目录操作是真正的性能杀手。我和我的同事已经在HADOOP-11694的S3A客户端上完成了工作,现在需要对Spark进行更改,以采用我们已经能够修复的特定API调用。为此,我们需要确保我们使用的是具有真实世界布局的真实数据集,所以不要针对特定的示例/基准进行优化。
目前,人们应该选择具有浅目录树的分区布局。